// Copyright Epic Games, Inc. All Rights Reserved. #include "compactcas.h" #include #include #include #include #include #include #include #include #if ZEN_WITH_TESTS # include # include # include # include # include # include #endif ////////////////////////////////////////////////////////////////////////// namespace zen { struct CasDiskIndexHeader { static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; static constexpr uint32_t CurrentVersion = 1; uint32_t Magic = ExpectedMagic; uint32_t Version = CurrentVersion; uint32_t PayloadAlignment = 0; uint32_t Reserved0 = 0; uint64_t EntryCount = 0; uint32_t Reserved1 = 0; uint32_t Reserved2 = 0; }; static_assert(sizeof(CasDiskIndexHeader) == 32); namespace { std::vector MakeCasDiskEntries(const std::unordered_map& MovedChunks, const std::vector& DeletedChunks) { std::vector result; result.reserve(MovedChunks.size()); for (const auto& MovedEntry : MovedChunks) { result.push_back({.Key = MovedEntry.first, .Location = MovedEntry.second}); } for (const IoHash& ChunkHash : DeletedChunks) { result.push_back({.Key = ChunkHash, .Flags = CasDiskIndexEntry::kTombstone}); } return result; } const char* IndexExtension = ".uidx"; const char* LogExtension = ".ulog"; const char* DataExtension = ".ucas"; std::filesystem::path GetBasePath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) { return RootPath / ContainerBaseName; } std::filesystem::path GetIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) { return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + IndexExtension); } std::filesystem::path GetTempIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) { return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + ".tmp" + LogExtension); } std::filesystem::path GetLogPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) { return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + LogExtension); } std::filesystem::path GetTempLogPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) { return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + ".tmp" + LogExtension); } std::filesystem::path GetRecoverLogPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) { return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + ".recover" + LogExtension); } std::filesystem::path GetBlocksBasePath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) { return GetBasePath(RootPath, ContainerBaseName) / "blocks"; } std::filesystem::path GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex) { ExtendablePathBuilder<256> Path; char BlockHexString[9]; ToHexNumber(BlockIndex, BlockHexString); Path.Append(BlocksBasePath); Path.AppendSeparator(); Path.AppendAsciiRange(BlockHexString, BlockHexString + 4); Path.AppendSeparator(); Path.Append(BlockHexString); Path.Append(".ucas"); return Path.ToPath(); } std::filesystem::path GetGCReservePath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) { return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + ".gc.reserve" + DataExtension); } std::filesystem::path GetLegacyLogPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) { return RootPath / (ContainerBaseName + LogExtension); } std::filesystem::path GetLegacyUcasPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) { return RootPath / (ContainerBaseName + DataExtension); } std::filesystem::path GetLegacyUidxPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) { return RootPath / (ContainerBaseName + IndexExtension); } struct LegacyCasDiskLocation { LegacyCasDiskLocation(uint64_t InOffset, uint64_t InSize) { ZEN_ASSERT(InOffset <= 0xff'ffff'ffff); ZEN_ASSERT(InSize <= 0xff'ffff'ffff); memcpy(&m_Offset[0], &InOffset, sizeof m_Offset); memcpy(&m_Size[0], &InSize, sizeof m_Size); } LegacyCasDiskLocation() = default; inline uint64_t GetOffset() const { uint64_t Offset = 0; memcpy(&Offset, &m_Offset, sizeof m_Offset); return Offset; } inline uint64_t GetSize() const { uint64_t Size = 0; memcpy(&Size, &m_Size, sizeof m_Size); return Size; } private: uint8_t m_Offset[5]; uint8_t m_Size[5]; }; struct LegacyCasDiskIndexEntry { static const uint8_t kTombstone = 0x01; IoHash Key; LegacyCasDiskLocation Location; ZenContentType ContentType = ZenContentType::kUnknownContentType; uint8_t Flags = 0; }; std::vector ReadIndexFile(const std::filesystem::path& RootDirectory, const std::string& ContainerBaseName, uint64_t& InOutPayloadAlignment) { std::vector Entries; std::filesystem::path SidxPath = GetIndexPath(RootDirectory, ContainerBaseName); if (std::filesystem::is_regular_file(SidxPath)) { Stopwatch Timer; const auto _ = MakeGuard([RootDirectory, ContainerBaseName, &Entries, &Timer] { ZEN_INFO("read store {} index containing {} entries in {}", RootDirectory / ContainerBaseName, Entries.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); BasicFile ObjectIndexFile; ObjectIndexFile.Open(SidxPath, BasicFile::EMode::kRead); uint64_t Size = ObjectIndexFile.FileSize(); if (Size >= sizeof(CasDiskIndexHeader)) { uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CasDiskIndexHeader))) / sizeof(CasDiskIndexEntry); CasDiskIndexHeader Header; ObjectIndexFile.Read(&Header, sizeof(Header), 0); if (Header.Magic == CasDiskIndexHeader::ExpectedMagic && Header.Version == CasDiskIndexHeader::CurrentVersion && Header.PayloadAlignment > 0 && Header.EntryCount == ExpectedEntryCount) { Entries.resize(Header.EntryCount); ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader)); InOutPayloadAlignment = Header.PayloadAlignment; } } } return Entries; } std::vector ReadLog(const std::filesystem::path& RootDirectory, const std::string& ContainerBaseName) { std::vector Entries; std::filesystem::path SlogPath = GetLogPath(RootDirectory, ContainerBaseName); if (std::filesystem::is_regular_file(SlogPath)) { Stopwatch Timer; const auto _ = MakeGuard([RootDirectory, ContainerBaseName, &Entries, &Timer] { ZEN_INFO("read store {} log containing {} entries in {}", RootDirectory / ContainerBaseName, Entries.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); TCasLogFile CasLog; CasLog.Open(SlogPath, CasLogFile::EMode::kRead); CasLog.Replay([&](const CasDiskIndexEntry& Record) { Entries.push_back(Record); }); } return Entries; } std::vector MigrateLegacyData(const std::filesystem::path& RootPath, const std::string& ContainerBaseName, uint64_t MaxBlockSize, uint64_t PayloadAlignment, bool CleanSource, const std::unordered_set& ExistingChunks) { std::filesystem::path BlocksBasePath = GetBlocksBasePath(RootPath, ContainerBaseName); std::filesystem::path LegacyLogPath = GetLegacyLogPath(RootPath, ContainerBaseName); std::filesystem::path LegacySobsPath = GetLegacyUcasPath(RootPath, ContainerBaseName); std::filesystem::path LegacySidxPath = GetLegacyUidxPath(RootPath, ContainerBaseName); uint64_t MigratedChunkCount = 0; uint32_t MigratedBlockCount = 0; uint32_t NewBlockIndex = 0; Stopwatch MigrationTimer; uint64_t TotalSize = 0; const auto _ = MakeGuard([RootPath, ContainerBaseName, &MigrationTimer, &MigratedChunkCount, &MigratedBlockCount, &TotalSize] { ZEN_INFO("migrated store {} to {} chunks in {} blocks in {} ({})", RootPath / ContainerBaseName, MigratedChunkCount, MigratedBlockCount, NiceTimeSpanMs(MigrationTimer.GetElapsedTimeMs()), NiceBytes(TotalSize)); }); std::vector Result; uint32_t WriteBlockIndex = 0; while (std::filesystem::exists(GetBlockPath(BlocksBasePath, WriteBlockIndex))) { ++WriteBlockIndex; } std::error_code Error; DiskSpace Space = DiskSpaceInfo(RootPath, Error); if (Error) { ZEN_ERROR("get disk space in {} FAILED, reason '{}'", ContainerBaseName, Error.message()); return Result; } if (Space.Free < MaxBlockSize) { ZEN_ERROR("legacy store migration from '{}' FAILED, required disk space {}, free {}", RootPath / ContainerBaseName, MaxBlockSize, NiceBytes(Space.Free)); return Result; } BasicFile BlockFile; BlockFile.Open(LegacySobsPath, BasicFile::EMode::kRead); uint64_t FileSize = BlockFile.FileSize(); std::unordered_map LegacyDiskIndex; TCasLogFile LegacyCasLog; LegacyCasLog.Open(LegacyLogPath, CleanSource ? CasLogFile::EMode::kWrite : CasLogFile::EMode::kRead); { Stopwatch Timer; const auto __ = MakeGuard([RootPath, ContainerBaseName, &LegacyDiskIndex, &Timer] { ZEN_INFO("read store {} legacy index containing {} entries in {}", RootPath / ContainerBaseName, LegacyDiskIndex.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); LegacyCasLog.Replay([&](const LegacyCasDiskIndexEntry& Record) { if (Record.Flags & LegacyCasDiskIndexEntry::kTombstone) { LegacyDiskIndex.erase(Record.Key); return; } uint64_t EntryEnd = Record.Location.GetOffset() + Record.Location.GetSize(); if (EntryEnd > FileSize) { return; } if (ExistingChunks.contains(Record.Key)) { return; } LegacyDiskIndex[Record.Key] = Record; }); } if (LegacyDiskIndex.empty()) { LegacyCasLog.Close(); if (CleanSource) { std::filesystem::remove(LegacyLogPath); BlockFile.Close(); std::filesystem::remove(LegacySobsPath); std::filesystem::remove(LegacySidxPath); } return Result; } for (const auto& Entry : LegacyDiskIndex) { const LegacyCasDiskIndexEntry& Record(Entry.second); TotalSize += Record.Location.GetSize(); } uint64_t RequiredDiskSpace = TotalSize + ((PayloadAlignment - 1) * LegacyDiskIndex.size()); uint64_t MaxRequiredBlockCount = RoundUp(RequiredDiskSpace, MaxBlockSize) / MaxBlockSize; if (MaxRequiredBlockCount > BlockStoreDiskLocation::MaxBlockIndex) { ZEN_ERROR("legacy store migration from '{}' FAILED, required block count {}, possible {}", RootPath / ContainerBaseName, MaxRequiredBlockCount, BlockStoreDiskLocation::MaxBlockIndex); return Result; } if (CleanSource) { if (Space.Free < (MaxBlockSize + (1 << 28))) { ZEN_INFO("legacy store migration from {} aborted, not enough disk space available {} ({})", RootPath / ContainerBaseName, NewBlockIndex + 1, NiceBytes(MaxBlockSize + (1 << 28)), NiceBytes(Space.Free)); return Result; } } else { if (Space.Free < (RequiredDiskSpace + (1 << 28))) { ZEN_INFO("legacy store migration from {} aborted, not enough disk space available {} ({})", RootPath / ContainerBaseName, NewBlockIndex + 1, NiceBytes(RequiredDiskSpace + (1 << 28)), NiceBytes(Space.Free)); return Result; } } std::filesystem::path SlogPath = GetLogPath(RootPath, ContainerBaseName); CreateDirectories(SlogPath.parent_path()); TCasLogFile CasLog; CasLog.Open(SlogPath, CasLogFile::EMode::kWrite); if (CleanSource && (MaxRequiredBlockCount < 2)) { Result.reserve(LegacyDiskIndex.size()); // We can use the block as is, just move it and add the blocks to our new log for (auto& Entry : LegacyDiskIndex) { const LegacyCasDiskIndexEntry& Record(Entry.second); BlockStoreLocation NewChunkLocation(WriteBlockIndex, Record.Location.GetOffset(), Record.Location.GetSize()); BlockStoreDiskLocation NewLocation(NewChunkLocation, PayloadAlignment); Result.push_back( {.Key = Entry.second.Key, .Location = NewLocation, .ContentType = Record.ContentType, .Flags = Record.Flags}); } std::filesystem::path BlockPath = GetBlockPath(BlocksBasePath, WriteBlockIndex); CreateDirectories(BlockPath.parent_path()); BlockFile.Close(); std::filesystem::rename(LegacySobsPath, BlockPath); CasLog.Append(Result); MigratedChunkCount += Result.size(); MigratedBlockCount++; } else { std::vector ChunkHashes; ChunkHashes.reserve(LegacyDiskIndex.size()); for (const auto& Entry : LegacyDiskIndex) { ChunkHashes.push_back(Entry.first); } std::sort(begin(ChunkHashes), end(ChunkHashes), [&](IoHash Lhs, IoHash Rhs) { auto LhsKeyIt = LegacyDiskIndex.find(Lhs); auto RhsKeyIt = LegacyDiskIndex.find(Rhs); return LhsKeyIt->second.Location.GetOffset() < RhsKeyIt->second.Location.GetOffset(); }); uint64_t BlockSize = 0; uint64_t BlockOffset = 0; std::vector NewLocations; struct BlockData { std::vector> Chunks; uint64_t BlockOffset; uint64_t BlockSize; uint32_t BlockIndex; }; std::vector BlockRanges; std::vector> Chunks; BlockRanges.reserve(MaxRequiredBlockCount); for (const IoHash& ChunkHash : ChunkHashes) { const LegacyCasDiskIndexEntry& LegacyEntry = LegacyDiskIndex[ChunkHash]; const LegacyCasDiskLocation& LegacyChunkLocation = LegacyEntry.Location; uint64_t ChunkOffset = LegacyChunkLocation.GetOffset(); uint64_t ChunkSize = LegacyChunkLocation.GetSize(); uint64_t ChunkEnd = ChunkOffset + ChunkSize; if (BlockSize == 0) { BlockOffset = ChunkOffset; } if ((ChunkEnd - BlockOffset) > MaxBlockSize) { BlockData BlockRange{.BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex}; BlockRange.Chunks.swap(Chunks); BlockRanges.push_back(BlockRange); WriteBlockIndex++; while (std::filesystem::exists(GetBlockPath(BlocksBasePath, WriteBlockIndex))) { ++WriteBlockIndex; } BlockOffset = ChunkOffset; BlockSize = 0; } BlockSize = RoundUp(BlockSize, PayloadAlignment); BlockStoreLocation ChunkLocation = {.BlockIndex = WriteBlockIndex, .Offset = BlockSize, .Size = ChunkSize}; Chunks.push_back({ChunkHash, ChunkLocation}); BlockSize += ChunkEnd - BlockOffset; } if (BlockSize > 0) { BlockRanges.push_back( {.Chunks = std::move(Chunks), .BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex}); } Stopwatch WriteBlockTimer; std::reverse(BlockRanges.begin(), BlockRanges.end()); std::vector Buffer(1 << 28); for (size_t Idx = 0; Idx < BlockRanges.size(); ++Idx) { const BlockData& BlockRange = BlockRanges[Idx]; uint64_t MSPerBlock = Idx > 0 ? (WriteBlockTimer.GetElapsedTimeMs() / Idx) : 10000; uint64_t ETA = (BlockRanges.size() - Idx) * MSPerBlock; ZEN_INFO("migrating store {} {}/{} blocks, remaining {} ({}) ETA: {}", RootPath / ContainerBaseName, Idx, BlockRanges.size(), NiceBytes(BlockRange.BlockOffset), NiceBytes(TotalSize), NiceTimeSpanMs(ETA)); std::filesystem::path BlockPath = GetBlockPath(BlocksBasePath, BlockRange.BlockIndex); BlockStoreFile ChunkBlock(BlockPath); ChunkBlock.Create(BlockRange.BlockSize); uint64_t Offset = 0; while (Offset < BlockRange.BlockSize) { uint64_t Size = BlockRange.BlockSize - Offset; if (Size > Buffer.size()) { Size = Buffer.size(); } BlockFile.Read(Buffer.data(), Size, BlockRange.BlockOffset + Offset); ChunkBlock.Write(Buffer.data(), Size, Offset); Offset += Size; } ChunkBlock.Flush(); std::vector LogEntries; LogEntries.reserve(BlockRange.Chunks.size()); for (const auto& Entry : BlockRange.Chunks) { const LegacyCasDiskIndexEntry& LegacyEntry = LegacyDiskIndex[Entry.first]; BlockStoreDiskLocation Location(Entry.second, PayloadAlignment); LogEntries.push_back( {.Key = Entry.first, .Location = Location, .ContentType = LegacyEntry.ContentType, .Flags = LegacyEntry.Flags}); } CasLog.Append(LogEntries); Result.insert(Result.end(), LogEntries.begin(), LogEntries.end()); MigratedChunkCount += LogEntries.size(); MigratedBlockCount++; if (CleanSource) { std::vector LegacyLogEntries; LegacyLogEntries.reserve(BlockRange.Chunks.size()); for (const auto& Entry : BlockRange.Chunks) { LegacyLogEntries.push_back({.Key = Entry.first, .Flags = LegacyCasDiskIndexEntry::kTombstone}); } LegacyCasLog.Append(LegacyLogEntries); BlockFile.SetFileSize(BlockRange.BlockOffset); } } } LegacyCasLog.Close(); CasLog.Close(); if (CleanSource) { std::filesystem::remove(LegacyLogPath); BlockFile.Close(); std::filesystem::remove(LegacySobsPath); std::filesystem::remove(LegacySidxPath); } return Result; } } // namespace ////////////////////////////////////////////////////////////////////////// CasContainerStrategy::CasContainerStrategy(const CasStoreConfiguration& Config, CasGc& Gc) : GcStorage(Gc) , m_Config(Config) , m_Log(logging::Get("containercas")) { } CasContainerStrategy::~CasContainerStrategy() { } void CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint32_t MaxBlockSize, uint64_t Alignment, bool IsNewStore) { ZEN_ASSERT(IsPow2(Alignment)); ZEN_ASSERT(!m_IsInitialized); ZEN_ASSERT(MaxBlockSize > 0); m_ContainerBaseName = ContainerBaseName; m_PayloadAlignment = Alignment; m_MaxBlockSize = MaxBlockSize; m_BlocksBasePath = GetBlocksBasePath(m_Config.RootDirectory, m_ContainerBaseName); OpenContainer(IsNewStore); m_IsInitialized = true; } CasStore::InsertResult CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash) { uint32_t WriteBlockIndex; Ref WriteBlock; uint64_t InsertOffset; { RwLock::ExclusiveLockScope _(m_InsertLock); { RwLock::SharedLockScope __(m_LocationMapLock); if (m_LocationMap.contains(ChunkHash)) { return CasStore::InsertResult{.New = false}; } } // New entry WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); bool IsWriting = m_WriteBlock != nullptr; if (!IsWriting || (m_CurrentInsertOffset + ChunkSize) > m_MaxBlockSize) { if (m_WriteBlock) { m_WriteBlock = nullptr; } { RwLock::ExclusiveLockScope __(m_LocationMapLock); if (m_ChunkBlocks.size() == BlockStoreDiskLocation::MaxBlockIndex) { throw std::runtime_error(fmt::format("unable to allocate a new block in {}", m_ContainerBaseName)); } WriteBlockIndex += IsWriting ? 1 : 0; while (m_ChunkBlocks.contains(WriteBlockIndex)) { WriteBlockIndex = (WriteBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex; } std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex); m_WriteBlock = new BlockStoreFile(BlockPath); m_ChunkBlocks[WriteBlockIndex] = m_WriteBlock; m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); } m_CurrentInsertOffset = 0; m_WriteBlock->Create(m_MaxBlockSize); } InsertOffset = m_CurrentInsertOffset; m_CurrentInsertOffset = RoundUp(InsertOffset + ChunkSize, m_PayloadAlignment); WriteBlock = m_WriteBlock; } // We can end up in a situation that InsertChunk writes the same chunk data in // different locations. // We release the insert lock once we have the correct WriteBlock ready and we know // where to write the data. If a new InsertChunk request for the same chunk hash/data // comes in before we update m_LocationMap below we will have a race. // The outcome of that is that we will write the chunk data in more than one location // but the chunk hash will only point to one of the chunks. // We will in that case waste space until the next GC operation. // // This should be a rare occasion and the current flow reduces the time we block for // reads, insert and GC. BlockStoreDiskLocation Location({.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = ChunkSize}, m_PayloadAlignment); const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = Location}; WriteBlock->Write(ChunkData, ChunkSize, InsertOffset); m_CasLog.Append(IndexEntry); m_TotalSize.fetch_add(static_cast(ChunkSize), std::memory_order_release); { RwLock::ExclusiveLockScope __(m_LocationMapLock); m_LocationMap.emplace(ChunkHash, Location); } return CasStore::InsertResult{.New = true}; } CasStore::InsertResult CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) { return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash); } IoBuffer CasContainerStrategy::FindChunk(const IoHash& ChunkHash) { Ref ChunkBlock; BlockStoreLocation Location; { RwLock::SharedLockScope _(m_LocationMapLock); if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end()) { Location = KeyIt->second.Get(m_PayloadAlignment); ChunkBlock = m_ChunkBlocks[Location.BlockIndex]; } else { return IoBuffer(); } } return ChunkBlock->GetChunk(Location.Offset, Location.Size); } bool CasContainerStrategy::HaveChunk(const IoHash& ChunkHash) { RwLock::SharedLockScope _(m_LocationMapLock); return m_LocationMap.contains(ChunkHash); } void CasContainerStrategy::FilterChunks(CasChunkSet& InOutChunks) { // This implementation is good enough for relatively small // chunk sets (in terms of chunk identifiers), but would // benefit from a better implementation which removes // items incrementally for large sets, especially when // we're likely to already have a large proportion of the // chunks in the set InOutChunks.RemoveChunksIf([&](const IoHash& Hash) { return HaveChunk(Hash); }); } void CasContainerStrategy::Flush() { { RwLock::ExclusiveLockScope _(m_InsertLock); if (m_CurrentInsertOffset > 0) { uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); WriteBlockIndex = (WriteBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex; m_WriteBlock = nullptr; m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); m_CurrentInsertOffset = 0; } } MakeIndexSnapshot(); } void CasContainerStrategy::Scrub(ScrubContext& Ctx) { std::vector BadChunks; // We do a read sweep through the payloads file and validate // any entries that are contained within each segment, with // the assumption that most entries will be checked in this // pass. An alternative strategy would be to use memory mapping. { std::vector BigChunks; const uint64_t WindowSize = 4 * 1024 * 1024; IoBuffer ReadBuffer{WindowSize}; void* BufferBase = ReadBuffer.MutableData(); RwLock::SharedLockScope _(m_InsertLock); // TODO: Refactor so we don't have to keep m_InsertLock all the time? RwLock::SharedLockScope __(m_LocationMapLock); for (const auto& Block : m_ChunkBlocks) { uint64_t WindowStart = 0; uint64_t WindowEnd = WindowSize; const Ref& BlockFile = Block.second; BlockFile->Open(); const uint64_t FileSize = BlockFile->FileSize(); do { const uint64_t ChunkSize = Min(WindowSize, FileSize - WindowStart); BlockFile->Read(BufferBase, ChunkSize, WindowStart); for (auto& Entry : m_LocationMap) { const BlockStoreLocation Location = Entry.second.Get(m_PayloadAlignment); const uint64_t EntryOffset = Location.Offset; if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd)) { const uint64_t EntryEnd = EntryOffset + Location.Size; if (EntryEnd >= WindowEnd) { BigChunks.push_back({.Key = Entry.first, .Location = Entry.second}); continue; } const IoHash ComputedHash = IoHash::HashBuffer(reinterpret_cast(BufferBase) + Location.Offset - WindowStart, Location.Size); if (Entry.first != ComputedHash) { // Hash mismatch BadChunks.push_back({.Key = Entry.first, .Location = Entry.second, .Flags = CasDiskIndexEntry::kTombstone}); } } } WindowStart += WindowSize; WindowEnd += WindowSize; } while (WindowStart < FileSize); } // Deal with large chunks for (const CasDiskIndexEntry& Entry : BigChunks) { IoHashStream Hasher; const BlockStoreLocation Location = Entry.Location.Get(m_PayloadAlignment); const Ref& BlockFile = m_ChunkBlocks[Location.BlockIndex]; BlockFile->StreamByteRange(Location.Offset, Location.Size, [&](const void* Data, uint64_t Size) { Hasher.Append(Data, Size); }); IoHash ComputedHash = Hasher.GetHash(); if (Entry.Key != ComputedHash) { BadChunks.push_back({.Key = Entry.Key, .Location = Entry.Location, .Flags = CasDiskIndexEntry::kTombstone}); } } } if (BadChunks.empty()) { return; } ZEN_ERROR("Scrubbing found {} bad chunks in '{}'", BadChunks.size(), m_ContainerBaseName); // Deal with bad chunks by removing them from our lookup map std::vector BadChunkHashes; BadChunkHashes.reserve(BadChunks.size()); m_CasLog.Append(BadChunks); { RwLock::ExclusiveLockScope _(m_LocationMapLock); for (const CasDiskIndexEntry& Entry : BadChunks) { BadChunkHashes.push_back(Entry.Key); m_LocationMap.erase(Entry.Key); } } // Let whomever it concerns know about the bad chunks. This could // be used to invalidate higher level data structures more efficiently // than a full validation pass might be able to do Ctx.ReportBadCasChunks(BadChunkHashes); } void CasContainerStrategy::UpdateLocations(const std::span& Entries) { for (const CasDiskIndexEntry& Entry : Entries) { if (Entry.Flags & CasDiskIndexEntry::kTombstone) { auto KeyIt = m_LocationMap.find(Entry.Key); uint64_t ChunkSize = KeyIt->second.GetSize(); m_TotalSize.fetch_sub(ChunkSize); m_LocationMap.erase(KeyIt); continue; } m_LocationMap[Entry.Key] = Entry.Location; } } void CasContainerStrategy::CollectGarbage(GcContext& GcCtx) { // It collects all the blocks that we want to delete chunks from. For each such // block we keep a list of chunks to retain and a list of chunks to delete. // // If there is a block that we are currently writing to, that block is omitted // from the garbage collection. // // Next it will iterate over all blocks that we want to remove chunks from. // If the block is empty after removal of chunks we mark the block as pending // delete - we want to delete it as soon as there are no IoBuffers using the // block file. // Once complete we update the m_LocationMap by removing the chunks. // // If the block is non-empty we write out the chunks we want to keep to a new // block file (creating new block files as needed). // // We update the index as we complete each new block file. This makes it possible // to break the GC if we want to limit time for execution. // // GC can fairly parallell to regular operation - it will block while taking // a snapshot of the current m_LocationMap state. // // While moving blocks it will do a blocking operation and update the m_LocationMap // after each new block is written and figuring out the path to the next new block. ZEN_INFO("collecting garbage from '{}'", m_Config.RootDirectory / m_ContainerBaseName); Stopwatch TotalTimer; uint64_t WriteBlockTimeUs = 0; uint64_t ReadBlockTimeUs = 0; uint64_t TotalChunkCount = 0; uint64_t DeletedSize = 0; uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed); std::vector DeletedChunks; uint64_t MovedCount = 0; const auto _ = MakeGuard([this, &TotalTimer, &WriteBlockTimeUs, &ReadBlockTimeUs, &TotalChunkCount, &DeletedChunks, &MovedCount, &DeletedSize, &OldTotalSize] { ZEN_INFO( "garbage collect from '{}' DONE after {} (write lock: {}, read lock: {}), collected {} bytes, deleted {} and moved {} of {} " "chunks ({}).", m_Config.RootDirectory / m_ContainerBaseName, NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), NiceTimeSpanMs(WriteBlockTimeUs / 1000), NiceTimeSpanMs(ReadBlockTimeUs / 1000), NiceBytes(DeletedSize), DeletedChunks.size(), MovedCount, TotalChunkCount, NiceBytes(OldTotalSize)); }); LocationMap_t LocationMap; size_t BlockCount; uint64_t ExcludeBlockIndex = 0x800000000ull; { RwLock::SharedLockScope __(m_InsertLock); RwLock::SharedLockScope ___(m_LocationMapLock); Stopwatch Timer; const auto ____ = MakeGuard([&Timer, &WriteBlockTimeUs] { WriteBlockTimeUs += Timer.GetElapsedTimeUs(); }); if (m_WriteBlock) { ExcludeBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); } LocationMap = m_LocationMap; BlockCount = m_ChunkBlocks.size(); } if (LocationMap.empty()) { ZEN_INFO("garbage collect SKIPPED, for '{}', container is empty", m_Config.RootDirectory / m_ContainerBaseName); return; } TotalChunkCount = LocationMap.size(); std::unordered_map BlockIndexToChunkMapIndex; std::vector> KeepChunks; std::vector> DeleteChunks; BlockIndexToChunkMapIndex.reserve(BlockCount); KeepChunks.reserve(BlockCount); DeleteChunks.reserve(BlockCount); size_t GuesstimateCountPerBlock = TotalChunkCount / BlockCount / 2; std::vector TotalChunkHashes; TotalChunkHashes.reserve(TotalChunkCount); for (const auto& Entry : LocationMap) { TotalChunkHashes.push_back(Entry.first); } uint64_t DeleteCount = 0; uint64_t NewTotalSize = 0; GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) { auto KeyIt = LocationMap.find(ChunkHash); const BlockStoreDiskLocation& Location = KeyIt->second; uint32_t BlockIndex = Location.GetBlockIndex(); if (static_cast(BlockIndex) == ExcludeBlockIndex) { return; } auto BlockIndexPtr = BlockIndexToChunkMapIndex.find(BlockIndex); size_t ChunkMapIndex = 0; if (BlockIndexPtr == BlockIndexToChunkMapIndex.end()) { ChunkMapIndex = KeepChunks.size(); BlockIndexToChunkMapIndex[BlockIndex] = ChunkMapIndex; KeepChunks.resize(ChunkMapIndex + 1); KeepChunks.back().reserve(GuesstimateCountPerBlock); DeleteChunks.resize(ChunkMapIndex + 1); DeleteChunks.back().reserve(GuesstimateCountPerBlock); } else { ChunkMapIndex = BlockIndexPtr->second; } if (Keep) { std::vector& ChunkMap = KeepChunks[ChunkMapIndex]; ChunkMap.push_back(ChunkHash); NewTotalSize += Location.GetSize(); } else { std::vector& ChunkMap = DeleteChunks[ChunkMapIndex]; ChunkMap.push_back(ChunkHash); DeleteCount++; } }); std::unordered_set BlocksToReWrite; BlocksToReWrite.reserve(BlockIndexToChunkMapIndex.size()); for (const auto& Entry : BlockIndexToChunkMapIndex) { uint32_t BlockIndex = Entry.first; size_t ChunkMapIndex = Entry.second; const std::vector& ChunkMap = DeleteChunks[ChunkMapIndex]; if (ChunkMap.empty()) { continue; } BlocksToReWrite.insert(BlockIndex); } const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); if (!PerformDelete) { uint64_t TotalSize = m_TotalSize.load(std::memory_order_relaxed); ZEN_INFO("garbage collect from '{}' DISABLED, found #{} {} chunks of total #{} {}", m_Config.RootDirectory / m_ContainerBaseName, DeleteCount, NiceBytes(TotalSize - NewTotalSize), TotalChunkCount, NiceBytes(TotalSize)); return; } // Move all chunks in blocks that have chunks removed to new blocks Ref NewBlockFile; uint64_t WriteOffset = 0; uint32_t NewBlockIndex = 0; DeletedChunks.reserve(DeleteCount); std::unordered_map MovedBlockChunks; for (uint32_t BlockIndex : BlocksToReWrite) { const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex]; Ref OldBlockFile; { RwLock::SharedLockScope _i(m_LocationMapLock); OldBlockFile = m_ChunkBlocks[BlockIndex]; } const std::vector& KeepMap = KeepChunks[ChunkMapIndex]; if (KeepMap.empty()) { const std::vector& DeleteMap = DeleteChunks[ChunkMapIndex]; std::vector LogEntries = MakeCasDiskEntries({}, DeleteMap); m_CasLog.Append(LogEntries); { RwLock::ExclusiveLockScope _i(m_LocationMapLock); Stopwatch Timer; const auto __ = MakeGuard([&Timer, &ReadBlockTimeUs] { ReadBlockTimeUs += Timer.GetElapsedTimeUs(); }); UpdateLocations(LogEntries); m_ChunkBlocks[BlockIndex] = nullptr; } DeletedChunks.insert(DeletedChunks.end(), DeleteMap.begin(), DeleteMap.end()); ZEN_DEBUG("marking cas store file for delete {}, block {}", m_ContainerBaseName, std::to_string(BlockIndex)); std::error_code Ec; OldBlockFile->MarkAsDeleteOnClose(Ec); if (Ec) { ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", OldBlockFile->GetPath(), Ec.message()); } continue; } std::vector Chunk; for (const IoHash& ChunkHash : KeepMap) { auto KeyIt = LocationMap.find(ChunkHash); const BlockStoreLocation ChunkLocation = KeyIt->second.Get(m_PayloadAlignment); Chunk.resize(ChunkLocation.Size); OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset); if (!NewBlockFile || (WriteOffset + Chunk.size() > m_MaxBlockSize)) { uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order::memory_order_relaxed); std::vector LogEntries = MakeCasDiskEntries(MovedBlockChunks, {}); m_CasLog.Append(LogEntries); { RwLock::ExclusiveLockScope __(m_LocationMapLock); Stopwatch Timer; const auto ___ = MakeGuard([&Timer, &ReadBlockTimeUs] { ReadBlockTimeUs += Timer.GetElapsedTimeUs(); }); UpdateLocations(LogEntries); if (m_ChunkBlocks.size() == BlockStoreDiskLocation::MaxBlockIndex) { ZEN_ERROR("unable to allocate a new block in {}, count limit {} exeeded", m_ContainerBaseName, static_cast(std::numeric_limits::max()) + 1); return; } while (m_ChunkBlocks.contains(NextBlockIndex)) { NextBlockIndex = (NextBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex; } std::filesystem::path NewBlockPath = GetBlockPath(m_BlocksBasePath, NextBlockIndex); NewBlockFile = new BlockStoreFile(NewBlockPath); m_ChunkBlocks[NextBlockIndex] = NewBlockFile; } MovedCount += MovedBlockChunks.size(); MovedBlockChunks.clear(); std::error_code Error; DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Error); if (Error) { ZEN_ERROR("get disk space in {} FAILED, reason '{}'", m_ContainerBaseName, Error.message()); return; } if (Space.Free < m_MaxBlockSize) { std::filesystem::path GCReservePath = GetGCReservePath(m_Config.RootDirectory, m_ContainerBaseName); if (!std::filesystem::is_regular_file(GCReservePath)) { ZEN_INFO("garbage collect from '{}' FAILED, required disk space {}, free {}", m_Config.RootDirectory / m_ContainerBaseName, m_MaxBlockSize, NiceBytes(Space.Free)); RwLock::ExclusiveLockScope _l(m_LocationMapLock); Stopwatch Timer; const auto __ = MakeGuard([&Timer, &ReadBlockTimeUs] { ReadBlockTimeUs += Timer.GetElapsedTimeUs(); }); m_ChunkBlocks.erase(NextBlockIndex); return; } ZEN_INFO("using gc reserve for '{}', disk free {}", m_Config.RootDirectory / m_ContainerBaseName, NiceBytes(Space.Free)); std::filesystem::path NewBlockPath = GetBlockPath(m_BlocksBasePath, NextBlockIndex); std::filesystem::rename(GCReservePath, NewBlockPath); NewBlockFile->Open(); } else { NewBlockFile->Create(m_MaxBlockSize); } NewBlockIndex = NextBlockIndex; WriteOffset = 0; } NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset); MovedBlockChunks.emplace( ChunkHash, BlockStoreDiskLocation({.BlockIndex = NewBlockIndex, .Offset = WriteOffset, .Size = Chunk.size()}, m_PayloadAlignment)); WriteOffset = RoundUp(WriteOffset + Chunk.size(), m_PayloadAlignment); } Chunk.clear(); const std::vector& DeleteMap = DeleteChunks[ChunkMapIndex]; std::vector LogEntries = MakeCasDiskEntries(MovedBlockChunks, DeleteMap); m_CasLog.Append(LogEntries); { RwLock::ExclusiveLockScope __(m_LocationMapLock); Stopwatch Timer; const auto ___ = MakeGuard([&Timer, &ReadBlockTimeUs] { ReadBlockTimeUs += Timer.GetElapsedTimeUs(); }); UpdateLocations(LogEntries); m_ChunkBlocks[BlockIndex] = nullptr; } MovedCount += MovedBlockChunks.size(); DeletedChunks.insert(DeletedChunks.end(), DeleteMap.begin(), DeleteMap.end()); MovedBlockChunks.clear(); ZEN_DEBUG("marking cas store file for delete {}, block index {}", m_ContainerBaseName, BlockIndex); std::error_code Ec; OldBlockFile->MarkAsDeleteOnClose(Ec); if (Ec) { ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", OldBlockFile->GetPath(), Ec.message()); } OldBlockFile = nullptr; } for (const IoHash& ChunkHash : DeletedChunks) { DeletedSize += LocationMap[ChunkHash].GetSize(); } GcCtx.DeletedCas(DeletedChunks); std::filesystem::path GCReservePath = GetGCReservePath(m_Config.RootDirectory, m_ContainerBaseName); if (std::filesystem::is_regular_file(GCReservePath)) { return; } std::error_code Error; DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Error); if (Error) { ZEN_ERROR("get disk space in {} FAILED, reason '{}'", m_ContainerBaseName, Error.message()); return; } if (Space.Free < m_MaxBlockSize) { ZEN_INFO("not enough space for garbage collect reserve '{}' FAILED, required disk space {}, free {}", m_Config.RootDirectory / m_ContainerBaseName, m_MaxBlockSize, NiceBytes(Space.Free)); return; } BasicFile GCReserveFile; CreateDirectories(GCReservePath.parent_path()); GCReserveFile.Open(GCReservePath, BasicFile::EMode::kTruncate); GCReserveFile.SetFileSize(m_MaxBlockSize); ZEN_DEBUG("recreated garbage collect reserve '{}', {} bytes", m_Config.RootDirectory / m_ContainerBaseName, NiceBytes(Space.Free)); } void CasContainerStrategy::MakeIndexSnapshot() { // Flush on done ZEN_INFO("write store {} snapshot", m_Config.RootDirectory / m_ContainerBaseName); uint64_t EntryCount = 0; Stopwatch Timer; const auto _ = MakeGuard([this, &EntryCount, &Timer] { ZEN_INFO("write store {} snapshot containing {} entries in {}", m_Config.RootDirectory / m_ContainerBaseName, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); namespace fs = std::filesystem; fs::path SlogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName); fs::path SidxPath = GetIndexPath(m_Config.RootDirectory, m_ContainerBaseName); fs::path STmplogPath = GetTempLogPath(m_Config.RootDirectory, m_ContainerBaseName); fs::path STmpSidxPath = GetTempIndexPath(m_Config.RootDirectory, m_ContainerBaseName); fs::path SRecoveredlogPath = GetRecoverLogPath(m_Config.RootDirectory, m_ContainerBaseName); // Index away, we keep it if something goes wrong if (fs::is_regular_file(STmpSidxPath)) { fs::remove(STmpSidxPath); } if (fs::is_regular_file(SidxPath)) { fs::rename(SidxPath, STmpSidxPath); } // Move cas away, we keep it if something goes wrong, any new chunks will be added to the new log { RwLock::ExclusiveLockScope __(m_InsertLock); RwLock::ExclusiveLockScope ___(m_LocationMapLock); m_CasLog.Close(); if (fs::is_regular_file(STmplogPath)) { fs::remove(STmplogPath); } fs::rename(SlogPath, STmplogPath); // Open an new log m_CasLog.Open(SlogPath, CasLogFile::EMode::kTruncate); } try { // Write the current state of the location map to a new index state std::vector Entries; { RwLock::SharedLockScope __(m_LocationMapLock); Entries.resize(m_LocationMap.size()); uint64_t EntryIndex = 0; for (auto& Entry : m_LocationMap) { CasDiskIndexEntry& IndexEntry = Entries[EntryIndex++]; IndexEntry.Key = Entry.first; IndexEntry.Location = Entry.second; } } BasicFile ObjectIndexFile; ObjectIndexFile.Open(SidxPath, BasicFile::EMode::kTruncate); CasDiskIndexHeader Header = {.PayloadAlignment = gsl::narrow(m_PayloadAlignment), .EntryCount = Entries.size()}; ObjectIndexFile.Write(&Header, sizeof(CasDiskIndexEntry), 0); ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexEntry)); ObjectIndexFile.Close(); EntryCount = Entries.size(); } catch (std::exception& Err) { ZEN_ERROR("snapshot FAILED, reason '{}'", Err.what()); // Reconstruct the log from old log and any added log entries RwLock::ExclusiveLockScope __(m_LocationMapLock); if (fs::is_regular_file(STmplogPath)) { std::vector Records; Records.reserve(m_LocationMap.size()); { TCasLogFile OldCasLog; OldCasLog.Open(STmplogPath, CasLogFile::EMode::kRead); OldCasLog.Replay([&](const CasDiskIndexEntry& Record) { Records.push_back(Record); }); } { m_CasLog.Replay([&](const CasDiskIndexEntry& Record) { Records.push_back(Record); }); } TCasLogFile RecoveredCasLog; RecoveredCasLog.Open(SRecoveredlogPath, CasLogFile::EMode::kWrite); RecoveredCasLog.Append(Records); RecoveredCasLog.Close(); fs::remove(SlogPath); fs::rename(SRecoveredlogPath, SlogPath); fs::remove(STmplogPath); } if (fs::is_regular_file(SidxPath)) { fs::remove(SidxPath); } // Restore any previous snapshot if (fs::is_regular_file(STmpSidxPath)) { fs::remove(SidxPath); fs::rename(STmpSidxPath, SidxPath); } } if (fs::is_regular_file(STmpSidxPath)) { fs::remove(STmpSidxPath); } if (fs::is_regular_file(STmplogPath)) { fs::remove(STmplogPath); } } void CasContainerStrategy::OpenContainer(bool IsNewStore) { // Add .running file and delete on clean on close to detect bad termination m_TotalSize = 0; m_LocationMap.clear(); std::filesystem::path BasePath = GetBasePath(m_Config.RootDirectory, m_ContainerBaseName); std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_Config.RootDirectory, m_ContainerBaseName); if (IsNewStore) { std::filesystem::path LegacySobsPath = GetLegacyUcasPath(m_Config.RootDirectory, m_ContainerBaseName); std::filesystem::remove(LegacyLogPath); std::filesystem::remove(LegacySobsPath); std::filesystem::remove_all(BasePath); } { std::vector IndexEntries = ReadIndexFile(m_Config.RootDirectory, m_ContainerBaseName, m_PayloadAlignment); for (const CasDiskIndexEntry& Entry : IndexEntries) { // Log and skip entries that don't makes sense m_LocationMap[Entry.Key] = Entry.Location; } } bool MakeSnapshot = false; { std::vector LogEntries = ReadLog(m_Config.RootDirectory, m_ContainerBaseName); for (const CasDiskIndexEntry& Entry : LogEntries) { // Log and skip entries that don't makes sense if (Entry.Flags & CasDiskIndexEntry::kTombstone) { m_LocationMap.erase(Entry.Key); continue; } m_LocationMap[Entry.Key] = Entry.Location; } MakeSnapshot = !LogEntries.empty(); } if (std::filesystem::is_regular_file(LegacyLogPath)) { std::unordered_set ExistingChunks; ExistingChunks.reserve(m_LocationMap.size()); for (const auto& Entry : m_LocationMap) { ExistingChunks.insert(Entry.first); } std::vector LegacyEntries = MigrateLegacyData(m_Config.RootDirectory, m_ContainerBaseName, m_MaxBlockSize, m_PayloadAlignment, true, ExistingChunks); for (const CasDiskIndexEntry& Entry : LegacyEntries) { // Log and skip entries that don't makes sense m_LocationMap[Entry.Key] = Entry.Location; } MakeSnapshot |= !LegacyEntries.empty(); } CreateDirectories(BasePath); std::filesystem::path SlogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName); m_CasLog.Open(SlogPath, CasLogFile::EMode::kWrite); std::unordered_set KnownBlocks; for (const auto& Entry : m_LocationMap) { const BlockStoreDiskLocation& Location = Entry.second; m_TotalSize.fetch_add(Location.GetSize(), std::memory_order_release); KnownBlocks.insert(Location.GetBlockIndex()); } if (std::filesystem::is_directory(m_BlocksBasePath)) { std::vector FoldersToScan; FoldersToScan.push_back(m_BlocksBasePath); size_t FolderOffset = 0; while (FolderOffset < FoldersToScan.size()) { for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(FoldersToScan[FolderOffset])) { if (Entry.is_directory()) { FoldersToScan.push_back(Entry.path()); continue; } if (Entry.is_regular_file()) { const std::filesystem::path Path = Entry.path(); if (Path.extension() != DataExtension) { continue; } std::string FileName = Path.stem().string(); uint32_t BlockIndex; bool OK = ParseHexNumber(FileName, BlockIndex); if (!OK) { continue; } if (!KnownBlocks.contains(BlockIndex)) { // Log removing unreferenced block // Clear out unused blocks std::filesystem::remove(Path); continue; } std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, BlockIndex); Ref BlockFile = new BlockStoreFile(BlockPath); BlockFile->Open(); m_ChunkBlocks[BlockIndex] = BlockFile; } } ++FolderOffset; } } // Create GC reserve file if possible std::filesystem::path GCReservePath = GetGCReservePath(m_Config.RootDirectory, m_ContainerBaseName); std::error_code Error; DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Error); if (Error) { ZEN_ERROR("get disk space in {} FAILED, reason '{}'", m_ContainerBaseName, Error.message()); return; } BasicFile GCReserveFile; if (std::filesystem::is_regular_file(GCReservePath)) { GCReserveFile.Open(GCReservePath, BasicFile::EMode::kWrite); std::uint64_t CurrentSize = GCReserveFile.FileSize(); if ((Space.Free - CurrentSize) >= m_MaxBlockSize) { GCReserveFile.SetFileSize(m_MaxBlockSize); } else { // We need it to be the proper size if we are to use it ZEN_WARN("removing gc reserve {}, not enough space free on drive, need {}, have {} ", m_Config.RootDirectory / m_ContainerBaseName, NiceBytes(m_MaxBlockSize), NiceBytes(Space.Free)); std::filesystem::remove(GCReservePath); } } else { if (Space.Free >= m_MaxBlockSize) { CreateDirectories(GCReservePath.parent_path()); GCReserveFile.Open(GCReservePath, BasicFile::EMode::kTruncate); GCReserveFile.SetFileSize(m_MaxBlockSize); } else { ZEN_WARN("can't create gc reserve {}, not enough space free on drive, need {}, have {} ", m_Config.RootDirectory / m_ContainerBaseName, NiceBytes(m_MaxBlockSize), NiceBytes(Space.Free)); } } if (MakeSnapshot) { MakeIndexSnapshot(); } // TODO: should validate integrity of container files here } ////////////////////////////////////////////////////////////////////////// #if ZEN_WITH_TESTS namespace { static IoBuffer CreateChunk(uint64_t Size) { static std::random_device rd; static std::mt19937 g(rd()); std::vector Values; Values.resize(Size); for (size_t Idx = 0; Idx < Size; ++Idx) { Values[Idx] = static_cast(Idx); } std::shuffle(Values.begin(), Values.end(), g); return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size()); } } // namespace TEST_CASE("compactcas.hex") { uint32_t Value; std::string HexString; CHECK(!ParseHexNumber("", Value)); char Hex[9]; ToHexNumber(0, Hex); HexString = std::string(Hex); CHECK(ParseHexNumber(HexString, Value)); CHECK(Value == 0); ToHexNumber(std::numeric_limits::max(), Hex); HexString = std::string(Hex); CHECK(HexString == "ffffffff"); CHECK(ParseHexNumber(HexString, Value)); CHECK(Value == std::numeric_limits::max()); ToHexNumber(0xadf14711, Hex); HexString = std::string(Hex); CHECK(HexString == "adf14711"); CHECK(ParseHexNumber(HexString, Value)); CHECK(Value == 0xadf14711); ToHexNumber(0x80000000, Hex); HexString = std::string(Hex); CHECK(HexString == "80000000"); CHECK(ParseHexNumber(HexString, Value)); CHECK(Value == 0x80000000); ToHexNumber(0x718293a4, Hex); HexString = std::string(Hex); CHECK(HexString == "718293a4"); CHECK(ParseHexNumber(HexString, Value)); CHECK(Value == 0x718293a4); } TEST_CASE("compactcas.compact.gc") { ScopedTemporaryDirectory TempDir; CasStoreConfiguration CasConfig; CasConfig.RootDirectory = TempDir.Path(); CreateDirectories(CasConfig.RootDirectory); const int kIterationCount = 1000; std::vector Keys(kIterationCount); { CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); Cas.Initialize("test", 65536, 16, true); for (int i = 0; i < kIterationCount; ++i) { CbObjectWriter Cbo; Cbo << "id" << i; CbObject Obj = Cbo.Save(); IoBuffer ObjBuffer = Obj.GetBuffer().AsIoBuffer(); const IoHash Hash = HashBuffer(ObjBuffer); Cas.InsertChunk(ObjBuffer, Hash); Keys[i] = Hash; } for (int i = 0; i < kIterationCount; ++i) { IoBuffer Chunk = Cas.FindChunk(Keys[i]); CHECK(!!Chunk); CbObject Value = LoadCompactBinaryObject(Chunk); CHECK_EQ(Value["id"].AsInt32(), i); } } // Validate that we can still read the inserted data after closing // the original cas store { CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); Cas.Initialize("test", 65536, 16, false); for (int i = 0; i < kIterationCount; ++i) { IoBuffer Chunk = Cas.FindChunk(Keys[i]); CHECK(!!Chunk); CbObject Value = LoadCompactBinaryObject(Chunk); CHECK_EQ(Value["id"].AsInt32(), i); } } } TEST_CASE("compactcas.compact.totalsize") { std::random_device rd; std::mt19937 g(rd()); // for (uint32_t i = 0; i < 100; ++i) { ScopedTemporaryDirectory TempDir; CasStoreConfiguration CasConfig; CasConfig.RootDirectory = TempDir.Path(); CreateDirectories(CasConfig.RootDirectory); const uint64_t kChunkSize = 1024; const int32_t kChunkCount = 16; { CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); Cas.Initialize("test", 65536, 16, true); for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) { IoBuffer Chunk = CreateChunk(kChunkSize); const IoHash Hash = HashBuffer(Chunk); CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash); ZEN_ASSERT(InsertResult.New); } const uint64_t TotalSize = Cas.StorageSize().DiskSize; CHECK_EQ(kChunkSize * kChunkCount, TotalSize); } { CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); Cas.Initialize("test", 65536, 16, false); const uint64_t TotalSize = Cas.StorageSize().DiskSize; CHECK_EQ(kChunkSize * kChunkCount, TotalSize); } } } TEST_CASE("compactcas.gc.basic") { ScopedTemporaryDirectory TempDir; CasStoreConfiguration CasConfig; CasConfig.RootDirectory = TempDir.Path(); CreateDirectories(CasConfig.RootDirectory); CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); Cas.Initialize("cb", 65536, 1 << 4, true); IoBuffer Chunk = CreateChunk(128); IoHash ChunkHash = IoHash::HashBuffer(Chunk); const CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, ChunkHash); CHECK(InsertResult.New); Cas.Flush(); GcContext GcCtx; GcCtx.CollectSmallObjects(true); Cas.CollectGarbage(GcCtx); CHECK(!Cas.HaveChunk(ChunkHash)); } TEST_CASE("compactcas.gc.removefile") { ScopedTemporaryDirectory TempDir; CasStoreConfiguration CasConfig; CasConfig.RootDirectory = TempDir.Path(); CreateDirectories(CasConfig.RootDirectory); IoBuffer Chunk = CreateChunk(128); IoHash ChunkHash = IoHash::HashBuffer(Chunk); { CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); Cas.Initialize("cb", 65536, 1 << 4, true); const CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, ChunkHash); CHECK(InsertResult.New); const CasStore::InsertResult InsertResultDup = Cas.InsertChunk(Chunk, ChunkHash); CHECK(!InsertResultDup.New); Cas.Flush(); } CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); Cas.Initialize("cb", 65536, 1 << 4, false); GcContext GcCtx; GcCtx.CollectSmallObjects(true); Cas.CollectGarbage(GcCtx); CHECK(!Cas.HaveChunk(ChunkHash)); } TEST_CASE("compactcas.gc.compact") { // for (uint32_t i = 0; i < 100; ++i) { ScopedTemporaryDirectory TempDir; CasStoreConfiguration CasConfig; CasConfig.RootDirectory = TempDir.Path(); CreateDirectories(CasConfig.RootDirectory); CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); Cas.Initialize("cb", 2048, 1 << 4, true); uint64_t ChunkSizes[9] = {128, 541, 1023, 781, 218, 37, 4, 997, 5}; std::vector Chunks; Chunks.reserve(9); for (uint64_t Size : ChunkSizes) { Chunks.push_back(CreateChunk(Size)); } std::vector ChunkHashes; ChunkHashes.reserve(9); for (const IoBuffer& Chunk : Chunks) { ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); } CHECK(Cas.InsertChunk(Chunks[0], ChunkHashes[0]).New); CHECK(Cas.InsertChunk(Chunks[1], ChunkHashes[1]).New); CHECK(Cas.InsertChunk(Chunks[2], ChunkHashes[2]).New); CHECK(Cas.InsertChunk(Chunks[3], ChunkHashes[3]).New); CHECK(Cas.InsertChunk(Chunks[4], ChunkHashes[4]).New); CHECK(Cas.InsertChunk(Chunks[5], ChunkHashes[5]).New); CHECK(Cas.InsertChunk(Chunks[6], ChunkHashes[6]).New); CHECK(Cas.InsertChunk(Chunks[7], ChunkHashes[7]).New); CHECK(Cas.InsertChunk(Chunks[8], ChunkHashes[8]).New); CHECK(Cas.HaveChunk(ChunkHashes[0])); CHECK(Cas.HaveChunk(ChunkHashes[1])); CHECK(Cas.HaveChunk(ChunkHashes[2])); CHECK(Cas.HaveChunk(ChunkHashes[3])); CHECK(Cas.HaveChunk(ChunkHashes[4])); CHECK(Cas.HaveChunk(ChunkHashes[5])); CHECK(Cas.HaveChunk(ChunkHashes[6])); CHECK(Cas.HaveChunk(ChunkHashes[7])); CHECK(Cas.HaveChunk(ChunkHashes[8])); uint64_t InitialSize = Cas.StorageSize().DiskSize; // Keep first and last { GcContext GcCtx; GcCtx.CollectSmallObjects(true); std::vector KeepChunks; KeepChunks.push_back(ChunkHashes[0]); KeepChunks.push_back(ChunkHashes[8]); GcCtx.ContributeCas(KeepChunks); Cas.Flush(); Cas.CollectGarbage(GcCtx); CHECK(Cas.HaveChunk(ChunkHashes[0])); CHECK(!Cas.HaveChunk(ChunkHashes[1])); CHECK(!Cas.HaveChunk(ChunkHashes[2])); CHECK(!Cas.HaveChunk(ChunkHashes[3])); CHECK(!Cas.HaveChunk(ChunkHashes[4])); CHECK(!Cas.HaveChunk(ChunkHashes[5])); CHECK(!Cas.HaveChunk(ChunkHashes[6])); CHECK(!Cas.HaveChunk(ChunkHashes[7])); CHECK(Cas.HaveChunk(ChunkHashes[8])); CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0]))); CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); } Cas.InsertChunk(Chunks[1], ChunkHashes[1]); Cas.InsertChunk(Chunks[2], ChunkHashes[2]); Cas.InsertChunk(Chunks[3], ChunkHashes[3]); Cas.InsertChunk(Chunks[4], ChunkHashes[4]); Cas.InsertChunk(Chunks[5], ChunkHashes[5]); Cas.InsertChunk(Chunks[6], ChunkHashes[6]); Cas.InsertChunk(Chunks[7], ChunkHashes[7]); // Keep last { GcContext GcCtx; GcCtx.CollectSmallObjects(true); std::vector KeepChunks; KeepChunks.push_back(ChunkHashes[8]); GcCtx.ContributeCas(KeepChunks); Cas.Flush(); Cas.CollectGarbage(GcCtx); CHECK(!Cas.HaveChunk(ChunkHashes[0])); CHECK(!Cas.HaveChunk(ChunkHashes[1])); CHECK(!Cas.HaveChunk(ChunkHashes[2])); CHECK(!Cas.HaveChunk(ChunkHashes[3])); CHECK(!Cas.HaveChunk(ChunkHashes[4])); CHECK(!Cas.HaveChunk(ChunkHashes[5])); CHECK(!Cas.HaveChunk(ChunkHashes[6])); CHECK(!Cas.HaveChunk(ChunkHashes[7])); CHECK(Cas.HaveChunk(ChunkHashes[8])); CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); Cas.InsertChunk(Chunks[1], ChunkHashes[1]); Cas.InsertChunk(Chunks[2], ChunkHashes[2]); Cas.InsertChunk(Chunks[3], ChunkHashes[3]); Cas.InsertChunk(Chunks[4], ChunkHashes[4]); Cas.InsertChunk(Chunks[5], ChunkHashes[5]); Cas.InsertChunk(Chunks[6], ChunkHashes[6]); Cas.InsertChunk(Chunks[7], ChunkHashes[7]); } // Keep mixed { GcContext GcCtx; GcCtx.CollectSmallObjects(true); std::vector KeepChunks; KeepChunks.push_back(ChunkHashes[1]); KeepChunks.push_back(ChunkHashes[4]); KeepChunks.push_back(ChunkHashes[7]); GcCtx.ContributeCas(KeepChunks); Cas.Flush(); Cas.CollectGarbage(GcCtx); CHECK(!Cas.HaveChunk(ChunkHashes[0])); CHECK(Cas.HaveChunk(ChunkHashes[1])); CHECK(!Cas.HaveChunk(ChunkHashes[2])); CHECK(!Cas.HaveChunk(ChunkHashes[3])); CHECK(Cas.HaveChunk(ChunkHashes[4])); CHECK(!Cas.HaveChunk(ChunkHashes[5])); CHECK(!Cas.HaveChunk(ChunkHashes[6])); CHECK(Cas.HaveChunk(ChunkHashes[7])); CHECK(!Cas.HaveChunk(ChunkHashes[8])); CHECK(ChunkHashes[1] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[1]))); CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4]))); CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7]))); Cas.InsertChunk(Chunks[0], ChunkHashes[0]); Cas.InsertChunk(Chunks[2], ChunkHashes[2]); Cas.InsertChunk(Chunks[3], ChunkHashes[3]); Cas.InsertChunk(Chunks[5], ChunkHashes[5]); Cas.InsertChunk(Chunks[6], ChunkHashes[6]); Cas.InsertChunk(Chunks[8], ChunkHashes[8]); } // Keep multiple at end { GcContext GcCtx; GcCtx.CollectSmallObjects(true); std::vector KeepChunks; KeepChunks.push_back(ChunkHashes[6]); KeepChunks.push_back(ChunkHashes[7]); KeepChunks.push_back(ChunkHashes[8]); GcCtx.ContributeCas(KeepChunks); Cas.Flush(); Cas.CollectGarbage(GcCtx); CHECK(!Cas.HaveChunk(ChunkHashes[0])); CHECK(!Cas.HaveChunk(ChunkHashes[1])); CHECK(!Cas.HaveChunk(ChunkHashes[2])); CHECK(!Cas.HaveChunk(ChunkHashes[3])); CHECK(!Cas.HaveChunk(ChunkHashes[4])); CHECK(!Cas.HaveChunk(ChunkHashes[5])); CHECK(Cas.HaveChunk(ChunkHashes[6])); CHECK(Cas.HaveChunk(ChunkHashes[7])); CHECK(Cas.HaveChunk(ChunkHashes[8])); CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6]))); CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7]))); CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); Cas.InsertChunk(Chunks[0], ChunkHashes[0]); Cas.InsertChunk(Chunks[1], ChunkHashes[1]); Cas.InsertChunk(Chunks[2], ChunkHashes[2]); Cas.InsertChunk(Chunks[3], ChunkHashes[3]); Cas.InsertChunk(Chunks[4], ChunkHashes[4]); Cas.InsertChunk(Chunks[5], ChunkHashes[5]); } // Keep every other { GcContext GcCtx; GcCtx.CollectSmallObjects(true); std::vector KeepChunks; KeepChunks.push_back(ChunkHashes[0]); KeepChunks.push_back(ChunkHashes[2]); KeepChunks.push_back(ChunkHashes[4]); KeepChunks.push_back(ChunkHashes[6]); KeepChunks.push_back(ChunkHashes[8]); GcCtx.ContributeCas(KeepChunks); Cas.Flush(); Cas.CollectGarbage(GcCtx); CHECK(Cas.HaveChunk(ChunkHashes[0])); CHECK(!Cas.HaveChunk(ChunkHashes[1])); CHECK(Cas.HaveChunk(ChunkHashes[2])); CHECK(!Cas.HaveChunk(ChunkHashes[3])); CHECK(Cas.HaveChunk(ChunkHashes[4])); CHECK(!Cas.HaveChunk(ChunkHashes[5])); CHECK(Cas.HaveChunk(ChunkHashes[6])); CHECK(!Cas.HaveChunk(ChunkHashes[7])); CHECK(Cas.HaveChunk(ChunkHashes[8])); CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0]))); CHECK(ChunkHashes[2] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[2]))); CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4]))); CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6]))); CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); Cas.InsertChunk(Chunks[1], ChunkHashes[1]); Cas.InsertChunk(Chunks[3], ChunkHashes[3]); Cas.InsertChunk(Chunks[5], ChunkHashes[5]); Cas.InsertChunk(Chunks[7], ChunkHashes[7]); } // Verify that we nicely appended blocks even after all GC operations CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0]))); CHECK(ChunkHashes[1] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[1]))); CHECK(ChunkHashes[2] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[2]))); CHECK(ChunkHashes[3] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[3]))); CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4]))); CHECK(ChunkHashes[5] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[5]))); CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6]))); CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7]))); CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); uint64_t FinalSize = Cas.StorageSize().DiskSize; CHECK(InitialSize == FinalSize); } } TEST_CASE("compactcas.gc.deleteblockonopen") { ScopedTemporaryDirectory TempDir; uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84}; std::vector Chunks; Chunks.reserve(20); for (uint64_t Size : ChunkSizes) { Chunks.push_back(CreateChunk(Size)); } std::vector ChunkHashes; ChunkHashes.reserve(20); for (const IoBuffer& Chunk : Chunks) { ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); } CasStoreConfiguration CasConfig; CasConfig.RootDirectory = TempDir.Path(); CreateDirectories(CasConfig.RootDirectory); { CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); Cas.Initialize("test", 1024, 16, true); for (size_t i = 0; i < 20; i++) { CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New); } // GC every other block { GcContext GcCtx; GcCtx.CollectSmallObjects(true); std::vector KeepChunks; for (size_t i = 0; i < 20; i += 2) { KeepChunks.push_back(ChunkHashes[i]); } GcCtx.ContributeCas(KeepChunks); Cas.Flush(); Cas.CollectGarbage(GcCtx); for (size_t i = 0; i < 20; i += 2) { CHECK(Cas.HaveChunk(ChunkHashes[i])); CHECK(!Cas.HaveChunk(ChunkHashes[i + 1])); CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i]))); } } } { // Re-open CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); Cas.Initialize("test", 1024, 16, false); for (size_t i = 0; i < 20; i += 2) { CHECK(Cas.HaveChunk(ChunkHashes[i])); CHECK(!Cas.HaveChunk(ChunkHashes[i + 1])); CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i]))); } } } TEST_CASE("compactcas.gc.handleopeniobuffer") { ScopedTemporaryDirectory TempDir; uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84}; std::vector Chunks; Chunks.reserve(20); for (const uint64_t& Size : ChunkSizes) { Chunks.push_back(CreateChunk(Size)); } std::vector ChunkHashes; ChunkHashes.reserve(20); for (const IoBuffer& Chunk : Chunks) { ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); } CasStoreConfiguration CasConfig; CasConfig.RootDirectory = TempDir.Path(); CreateDirectories(CasConfig.RootDirectory); CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); Cas.Initialize("test", 1024, 16, true); for (size_t i = 0; i < 20; i++) { CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New); } IoBuffer RetainChunk = Cas.FindChunk(ChunkHashes[5]); Cas.Flush(); // GC everything GcContext GcCtx; GcCtx.CollectSmallObjects(true); Cas.CollectGarbage(GcCtx); for (size_t i = 0; i < 20; i++) { CHECK(!Cas.HaveChunk(ChunkHashes[i])); } CHECK(ChunkHashes[5] == IoHash::HashBuffer(RetainChunk)); } TEST_CASE("compactcas.legacyconversion") { ScopedTemporaryDirectory TempDir; uint64_t ChunkSizes[] = {2041, 1123, 1223, 1239, 341, 1412, 912, 774, 341, 431, 554, 1098, 2048, 339, 561, 16, 16, 2048, 2048}; size_t ChunkCount = sizeof(ChunkSizes) / sizeof(uint64_t); size_t SingleBlockSize = 0; std::vector Chunks; Chunks.reserve(ChunkCount); for (uint64_t Size : ChunkSizes) { Chunks.push_back(CreateChunk(Size)); SingleBlockSize += Size; } std::vector ChunkHashes; ChunkHashes.reserve(ChunkCount); for (const IoBuffer& Chunk : Chunks) { ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); } CasStoreConfiguration CasConfig; CasConfig.RootDirectory = TempDir.Path(); CreateDirectories(CasConfig.RootDirectory); { CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); Cas.Initialize("test", gsl::narrow(SingleBlockSize * 2), 16, true); for (size_t i = 0; i < ChunkCount; i++) { CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New); } std::vector KeepChunks; for (size_t i = 0; i < ChunkCount; i += 2) { KeepChunks.push_back(ChunkHashes[i]); } GcContext GcCtx; GcCtx.CollectSmallObjects(true); GcCtx.ContributeCas(KeepChunks); Cas.Flush(); Gc.CollectGarbage(GcCtx); } std::filesystem::path CasPath = GetBlockPath(GetBlocksBasePath(CasConfig.RootDirectory, "test"), 1); std::filesystem::path LegacyCasPath = GetLegacyUcasPath(CasConfig.RootDirectory, "test"); std::filesystem::rename(CasPath, LegacyCasPath); std::vector LogEntries; std::filesystem::path SidxPath = GetIndexPath(CasConfig.RootDirectory, "test"); if (std::filesystem::is_regular_file(SidxPath)) { BasicFile ObjectIndexFile; ObjectIndexFile.Open(SidxPath, BasicFile::EMode::kRead); uint64_t Size = ObjectIndexFile.FileSize(); if (Size >= sizeof(CasDiskIndexHeader)) { uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CasDiskIndexHeader))) / sizeof(CasDiskIndexEntry); CasDiskIndexHeader Header; ObjectIndexFile.Read(&Header, sizeof(Header), 0); if (Header.Magic == CasDiskIndexHeader::ExpectedMagic && Header.Version == CasDiskIndexHeader::CurrentVersion && Header.PayloadAlignment > 0 && Header.EntryCount == ExpectedEntryCount) { LogEntries.resize(Header.EntryCount); ObjectIndexFile.Read(LogEntries.data(), Header.EntryCount * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader)); } } ObjectIndexFile.Close(); std::filesystem::remove(SidxPath); } std::filesystem::path SlogPath = GetLogPath(CasConfig.RootDirectory, "test"); { TCasLogFile CasLog; CasLog.Open(SlogPath, CasLogFile::EMode::kRead); CasLog.Replay([&](const CasDiskIndexEntry& Record) { LogEntries.push_back(Record); }); } TCasLogFile LegacyCasLog; std::filesystem::path SLegacylogPath = GetLegacyLogPath(CasConfig.RootDirectory, "test"); LegacyCasLog.Open(SLegacylogPath, CasLogFile::EMode::kTruncate); for (const CasDiskIndexEntry& Entry : LogEntries) { BlockStoreLocation Location = Entry.Location.Get(16); LegacyCasDiskLocation LegacyLocation(Location.Offset, Location.Size); LegacyCasDiskIndexEntry LegacyEntry = {.Key = Entry.Key, .Location = LegacyLocation, .ContentType = Entry.ContentType, .Flags = Entry.Flags}; LegacyCasLog.Append(LegacyEntry); } LegacyCasLog.Close(); std::filesystem::remove_all(CasConfig.RootDirectory / "test"); { CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); Cas.Initialize("test", 2048, 16, false); for (size_t i = 0; i < ChunkCount; i += 2) { CHECK(Cas.HaveChunk(ChunkHashes[i])); CHECK(!Cas.HaveChunk(ChunkHashes[i + 1])); CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i]))); } } } TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true)) { // for (uint32_t i = 0; i < 100; ++i) { ScopedTemporaryDirectory TempDir; CasStoreConfiguration CasConfig; CasConfig.RootDirectory = TempDir.Path(); CreateDirectories(CasConfig.RootDirectory); const uint64_t kChunkSize = 1048; const int32_t kChunkCount = 8192; std::vector ChunkHashes; ChunkHashes.reserve(kChunkCount); std::vector Chunks; Chunks.reserve(kChunkCount); for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) { IoBuffer Chunk = CreateChunk(kChunkSize); IoHash Hash = HashBuffer(Chunk); ChunkHashes.emplace_back(Hash); Chunks.emplace_back(Chunk); } WorkerThreadPool ThreadPool(4); CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); Cas.Initialize("test", 32768, 16, true); { for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) { const IoBuffer& Chunk = Chunks[Idx]; const IoHash& Hash = ChunkHashes[Idx]; ThreadPool.ScheduleWork([&Cas, Chunk, Hash]() { CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash); ZEN_ASSERT(InsertResult.New); }); } ThreadPool.Flush(); } const uint64_t TotalSize = Cas.StorageSize().DiskSize; CHECK_EQ(kChunkSize * kChunkCount, TotalSize); { std::vector OldChunkHashes(ChunkHashes.begin(), ChunkHashes.end()); for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) { ThreadPool.ScheduleWork([&Cas, &OldChunkHashes, Idx]() { IoHash ChunkHash = OldChunkHashes[Idx]; IoBuffer Chunk = Cas.FindChunk(ChunkHash); IoHash Hash = IoHash::HashBuffer(Chunk); CHECK(ChunkHash == Hash); }); } ThreadPool.Flush(); } std::unordered_set GcChunkHashes(ChunkHashes.begin(), ChunkHashes.end()); { std::vector OldChunkHashes(ChunkHashes.begin(), ChunkHashes.end()); std::vector NewChunkHashes; NewChunkHashes.reserve(kChunkCount); std::vector NewChunks; NewChunks.reserve(kChunkCount); for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) { IoBuffer Chunk = CreateChunk(kChunkSize); IoHash Hash = HashBuffer(Chunk); NewChunkHashes.emplace_back(Hash); NewChunks.emplace_back(Chunk); } RwLock ChunkHashesLock; std::atomic_uint32_t AddedChunkCount; for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) { const IoBuffer& Chunk = NewChunks[Idx]; const IoHash& Hash = NewChunkHashes[Idx]; ThreadPool.ScheduleWork([&Cas, Chunk, Hash, &AddedChunkCount]() { CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash); ZEN_ASSERT(InsertResult.New); AddedChunkCount.fetch_add(1); }); ThreadPool.ScheduleWork([&Cas, &ChunkHashesLock, &OldChunkHashes, Idx]() { IoHash ChunkHash = OldChunkHashes[Idx]; IoBuffer Chunk = Cas.FindChunk(OldChunkHashes[Idx]); if (Chunk) { CHECK(ChunkHash == IoHash::HashBuffer(Chunk)); } }); } while (AddedChunkCount.load() < kChunkCount) { std::vector AddedHashes; { RwLock::ExclusiveLockScope _(ChunkHashesLock); AddedHashes.swap(NewChunkHashes); } // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope for (const IoHash& ChunkHash : AddedHashes) { if (Cas.HaveChunk(ChunkHash)) { GcChunkHashes.emplace(ChunkHash); } } std::vector KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end()); int32_t C = 0; while (C < KeepHashes.size()) { if (C % 155 == 0) { if (C < KeepHashes.size() - 1) { KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; KeepHashes.pop_back(); } if (C + 3 < KeepHashes.size() - 1) { KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1]; KeepHashes.pop_back(); } } C++; } GcContext GcCtx; GcCtx.CollectSmallObjects(true); GcCtx.ContributeCas(KeepHashes); Cas.CollectGarbage(GcCtx); CasChunkSet& Deleted = GcCtx.DeletedCas(); Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); } ThreadPool.Flush(); { std::vector AddedHashes; { RwLock::ExclusiveLockScope _(ChunkHashesLock); AddedHashes.swap(NewChunkHashes); } // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope for (const IoHash& ChunkHash : AddedHashes) { if (Cas.HaveChunk(ChunkHash)) { GcChunkHashes.emplace(ChunkHash); } } std::vector KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end()); int32_t C = 0; while (C < KeepHashes.size()) { if (C % 77 == 0 && C < KeepHashes.size() - 1) { KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; KeepHashes.pop_back(); } C++; } GcContext GcCtx; GcCtx.CollectSmallObjects(true); GcCtx.ContributeCas(KeepHashes); Cas.CollectGarbage(GcCtx); CasChunkSet& Deleted = GcCtx.DeletedCas(); Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); } } { for (const IoHash& ChunkHash : GcChunkHashes) { ThreadPool.ScheduleWork([&Cas, ChunkHash]() { CHECK(Cas.HaveChunk(ChunkHash)); CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash))); }); } ThreadPool.Flush(); } } } TEST_CASE("compactcas.migrate.large.data" * doctest::skip(true)) { auto getChunkSet = [](const std::vector& Entries) { std::unordered_set ChunkHashes; ChunkHashes.reserve(Entries.size()); for (const auto& Entry : Entries) { ChunkHashes.insert(Entry.Key); } return ChunkHashes; }; const char* BigDataPath = "D:\\zen-data\\dc4-zen-cache-t\\cas"; std::filesystem::path TobsBasePath = GetBasePath(BigDataPath, "tobs"); std::filesystem::path SobsBasePath = GetBasePath(BigDataPath, "sobs"); std::filesystem::remove_all(TobsBasePath); std::filesystem::remove_all(SobsBasePath); uint64_t TobsPayloadAlignment = 16; uint64_t TobsBlockSize = 1u << 28; std::vector TobsMigratedChunks = MigrateLegacyData(BigDataPath, "tobs", TobsBlockSize, TobsPayloadAlignment, false, {}); CHECK(TobsMigratedChunks.size() > 0); uint64_t SobsPayloadAlignment = 4096; uint64_t SobsBlockSize = 1u << 30; std::vector SobsMigratedChunks = MigrateLegacyData(BigDataPath, "sobs", SobsBlockSize, SobsPayloadAlignment, false, {}); CHECK(SobsMigratedChunks.size() > 0); CasStoreConfiguration CasConfig; CasConfig.RootDirectory = BigDataPath; CasGc TobsCasGc; CasContainerStrategy TobsCas(CasConfig, TobsCasGc); TobsCas.Initialize("tobs", 1u << 28, 16, false); GcContext TobsGcCtx; TobsCas.CollectGarbage(TobsGcCtx); for (const CasDiskIndexEntry& Entry : TobsMigratedChunks) { CHECK(TobsCas.HaveChunk(Entry.Key)); } CasGc SobsCasGc; CasContainerStrategy SobsCas(CasConfig, SobsCasGc); SobsCas.Initialize("sobs", 1u << 30, 4096, false); GcContext SobsGcCtx; SobsCas.CollectGarbage(SobsGcCtx); for (const CasDiskIndexEntry& Entry : SobsMigratedChunks) { CHECK(SobsCas.HaveChunk(Entry.Key)); } } #endif void compactcas_forcelink() { } } // namespace zen