// Copyright Epic Games, Inc. All Rights Reserved. #include "compactcas.h" #include #include #include #include #include #include #include #if ZEN_WITH_TESTS # include # include # include # include # include #endif ////////////////////////////////////////////////////////////////////////// namespace zen { namespace { uint64_t AlignPositon(uint64_t Offset, uint64_t Alignment) { return (Offset + Alignment - 1) & ~(Alignment - 1); } std::filesystem::path BuildUcasPath(const std::filesystem::path& RootDirectory, const std::string_view ContainerBaseName, const uint16_t BlockIndex) { return RootDirectory / (std::string(ContainerBaseName) + "." + (std::to_string(BlockIndex) + ".ucas")); } void MarkFileAsDeleteOnClose(void* FileHandle) { #if ZEN_PLATFORM_WINDOWS FILE_DISPOSITION_INFO Fdi{}; Fdi.DeleteFile = TRUE; BOOL Success = SetFileInformationByHandle(FileHandle, FileDispositionInfo, &Fdi, sizeof Fdi); if (!Success) { ZEN_WARN("Failed to flag temporary payload file '{}' for deletion: '{}'", PathFromHandle(FileHandle), GetLastErrorAsString()); } #elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC std::filesystem::path SourcePath = PathFromHandle(FileRef.FileHandle); if (unlink(SourcePath.c_str()) < 0) { int UnlinkError = zen::GetLastError(); if (UnlinkError != ENOENT) { ZEN_WARN("unlink of CAS payload file failed ('{}')", GetSystemErrorAsString(UnlinkError)); } } #endif } } // namespace CasContainerStrategy::CasContainerStrategy(const CasStoreConfiguration& Config, CasGc& Gc) : GcStorage(Gc) , m_Config(Config) , m_Log(logging::Get("containercas")) { } CasContainerStrategy::~CasContainerStrategy() { } void CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint32_t MaxBlockSize, uint64_t Alignment, bool IsNewStore) { ZEN_ASSERT(IsPow2(Alignment)); ZEN_ASSERT(!m_IsInitialized); ZEN_ASSERT(MaxBlockSize > 0); m_ContainerBaseName = ContainerBaseName; m_PayloadAlignment = Alignment; m_MaxBlockSize = MaxBlockSize; OpenContainer(IsNewStore); m_IsInitialized = true; } CasStore::InsertResult CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash) { RwLock::ExclusiveLockScope _i(m_InsertLock); { RwLock::SharedLockScope _l(m_LocationMapLock); auto KeyIt = m_LocationMap.find(ChunkHash); if (KeyIt != m_LocationMap.end()) { return CasStore::InsertResult{.New = false}; } } // New entry uint64_t CurrentBlockSize = m_CurrentBlock.lock()->FileSize(); if (CurrentBlockSize + m_CurrentInsertOffset > m_MaxBlockSize) { RwLock::ExclusiveLockScope __(m_LocationMapLock); uint16_t NewBlockIndex = m_CurrentBlockIndex + 1; while (m_OpenBlocks.contains(NewBlockIndex)) { NewBlockIndex++; if (NewBlockIndex == m_CurrentBlockIndex) { throw std::runtime_error(fmt::format("unable to allocate a new block in {}", m_ContainerBaseName)); } } m_CurrentBlockIndex = NewBlockIndex; std::filesystem::path path = BuildUcasPath(m_Config.RootDirectory, m_ContainerBaseName, m_CurrentBlockIndex); auto SmallObjectFile = std::make_shared(); SmallObjectFile->Open(path, true); m_OpenBlocks[m_CurrentBlockIndex] = SmallObjectFile; m_CurrentBlock = SmallObjectFile; m_CurrentInsertOffset = 0; } const uint32_t InsertOffset = m_CurrentInsertOffset; m_CurrentBlock.lock()->Write(ChunkData, ChunkSize, InsertOffset); m_CurrentInsertOffset = static_cast(AlignPositon(InsertOffset + ChunkSize, m_PayloadAlignment)); const CasDiskLocation Location{m_CurrentBlockIndex, InsertOffset, static_cast(ChunkSize)}; CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = Location}; RwLock::ExclusiveLockScope __(m_LocationMapLock); m_LocationMap[ChunkHash] = Location; m_TotalSize.fetch_add(static_cast(ChunkSize)); m_CasLog.Append(IndexEntry); return CasStore::InsertResult{.New = true}; } CasStore::InsertResult CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) { return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash); } IoBuffer CasContainerStrategy::FindChunk(const IoHash& ChunkHash) { RwLock::SharedLockScope _(m_LocationMapLock); if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end()) { const CasDiskLocation& Location = KeyIt->second; if (auto BlockIt = m_OpenBlocks.find(Location.BlockIndex); BlockIt != m_OpenBlocks.end()) { return IoBufferBuilder::MakeFromFileHandle(BlockIt->second->Handle(), Location.Offset, Location.Size); } } // Not found return IoBuffer(); } bool CasContainerStrategy::HaveChunk(const IoHash& ChunkHash) { RwLock::SharedLockScope _(m_LocationMapLock); return m_LocationMap.contains(ChunkHash); } void CasContainerStrategy::FilterChunks(CasChunkSet& InOutChunks) { // This implementation is good enough for relatively small // chunk sets (in terms of chunk identifiers), but would // benefit from a better implementation which removes // items incrementally for large sets, especially when // we're likely to already have a large proportion of the // chunks in the set InOutChunks.RemoveChunksIf([&](const IoHash& Hash) { return HaveChunk(Hash); }); } void CasContainerStrategy::Flush() { RwLock::SharedLockScope _(m_InsertLock); m_CasLog.Flush(); m_CurrentBlock.lock()->Flush(); } void CasContainerStrategy::Scrub(ScrubContext& Ctx) { const uint64_t WindowSize = 4 * 1024 * 1024; std::vector BigChunks; std::vector BadChunks; // We do a read sweep through the payloads file and validate // any entries that are contained within each segment, with // the assumption that most entries will be checked in this // pass. An alternative strategy would be to use memory mapping. { IoBuffer ReadBuffer{WindowSize}; void* BufferBase = ReadBuffer.MutableData(); RwLock::SharedLockScope _(m_InsertLock); // TODO: Refactor so we don't have to keep m_InsertLock all the time? RwLock::SharedLockScope __(m_LocationMapLock); for (const auto& Block : m_OpenBlocks) { uint64_t WindowStart = 0; uint64_t WindowEnd = WindowSize; auto& SmallObjectFile = *Block.second; const uint64_t FileSize = SmallObjectFile.FileSize(); do { const uint64_t ChunkSize = Min(WindowSize, FileSize - WindowStart); SmallObjectFile.Read(BufferBase, ChunkSize, WindowStart); for (auto& Entry : m_LocationMap) { const uint64_t EntryOffset = Entry.second.Offset; if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd)) { const uint64_t EntryEnd = EntryOffset + Entry.second.Size; if (EntryEnd >= WindowEnd) { BigChunks.push_back({.Key = Entry.first, .Location = Entry.second}); continue; } const IoHash ComputedHash = IoHash::HashBuffer(reinterpret_cast(BufferBase) + Entry.second.Offset - WindowStart, Entry.second.Size); if (Entry.first != ComputedHash) { // Hash mismatch BadChunks.push_back({.Key = Entry.first, .Location = Entry.second}); } } } WindowStart += WindowSize; WindowEnd += WindowSize; } while (WindowStart < FileSize); } // Deal with large chunks for (const CasDiskIndexEntry& Entry : BigChunks) { IoHashStream Hasher; auto& SmallObjectFile = *m_OpenBlocks[Entry.Location.BlockIndex]; SmallObjectFile.StreamByteRange(Entry.Location.Offset, Entry.Location.Size, [&](const void* Data, uint64_t Size) { Hasher.Append(Data, Size); }); IoHash ComputedHash = Hasher.GetHash(); if (Entry.Key != ComputedHash) { BadChunks.push_back(Entry); } } } if (BadChunks.empty()) { return; } ZEN_ERROR("Scrubbing found {} bad chunks in '{}'", BadChunks.size(), m_ContainerBaseName); // Deal with bad chunks by removing them from our lookup map std::vector BadChunkHashes; RwLock::ExclusiveLockScope _(m_LocationMapLock); for (const CasDiskIndexEntry& Entry : BadChunks) { BadChunkHashes.push_back(Entry.Key); m_CasLog.Append({.Key = Entry.Key, .Location = Entry.Location, .Flags = CasDiskIndexEntry::kTombstone}); m_LocationMap.erase(Entry.Key); } // Let whomever it concerns know about the bad chunks. This could // be used to invalidate higher level data structures more efficiently // than a full validation pass might be able to do Ctx.ReportBadCasChunks(BadChunkHashes); } void CasContainerStrategy::CollectGarbage(GcContext& GcCtx) { namespace fs = std::filesystem; // It collects all the blocks that we want to delete chunks from. For each such // block we keep a list of chunks to retain. // // It will first remove any chunks that are flushed from the m_LocationMap. // // It then checks to see if we want to purge any chunks that are in the currently // active block. If so, we break off the current block and start on a new block, // otherwise we just let the active block be. // // Next it will iterate over all blocks that we want to remove chunks from. // If the block is empty after removal of chunks we mark the block as pending // delete - we want to delete it as soon as there are no IoBuffers using the // block file. // // If the block is non-empty we write out the chunks we want to keep to a new // block file (creating new block files as needed). // // We update the index as we complete each new block file. This makes it possible // to break the GC if we want to limit time for execution. // // GC can fairly parallell to regular operation - it will block while figuring // out which chunks to remove and what blocks to rewrite but the actual // reading and writing of data to new block files does not block regular operation. // // While moving blocks it will do a blocking operation and update the m_LocationMap // after each new block is written and it will also block when figuring out the // path to the next new block. ZEN_INFO("collecting garbage from '{}'", m_Config.RootDirectory / m_ContainerBaseName); std::unordered_map BlockIndexToKeepChunksMap; std::vector> KeepChunks; std::vector DeletedChunks; std::unordered_set BlocksToReWrite; { RwLock::ExclusiveLockScope _i(m_InsertLock); RwLock::ExclusiveLockScope _l(m_LocationMapLock); m_CasLog.Flush(); m_CurrentBlock.lock()->Flush(); BlocksToReWrite.reserve(m_OpenBlocks.size()); if (m_LocationMap.empty()) { ZEN_INFO("garbage collect SKIPPED, for '{}', container is empty", m_Config.RootDirectory / m_ContainerBaseName); return; } const uint64_t TotalChunkCount = m_LocationMap.size(); uint64_t TotalSize = m_TotalSize.load(); std::vector TotalChunkHashes; TotalChunkHashes.reserve(m_LocationMap.size()); for (const auto& Entry : m_LocationMap) { TotalChunkHashes.push_back(Entry.first); if (BlockIndexToKeepChunksMap.contains(Entry.second.BlockIndex)) { continue; } BlockIndexToKeepChunksMap[Entry.second.BlockIndex] = KeepChunks.size(); KeepChunks.resize(KeepChunks.size() + 1); } const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); uint64_t NewTotalSize = 0; GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) { if (Keep) { auto KeyIt = m_LocationMap.find(ChunkHash); const auto& ChunkLocation = KeyIt->second; auto& ChunkMap = KeepChunks[BlockIndexToKeepChunksMap[ChunkLocation.BlockIndex]]; ChunkMap[ChunkHash] = ChunkLocation; NewTotalSize += ChunkLocation.Size; } else { DeletedChunks.push_back(ChunkHash); } }); if (!PerformDelete) { ZEN_INFO("garbage collect from '{}' DISABLED, found #{} {} chunks of total #{} {}", m_Config.RootDirectory / m_ContainerBaseName, DeletedChunks.size(), NiceBytes(TotalSize - NewTotalSize), TotalChunkCount, NiceBytes(TotalSize)); return; } for (const auto& ChunkHash : DeletedChunks) { auto KeyIt = m_LocationMap.find(ChunkHash); const auto& ChunkLocation = KeyIt->second; BlocksToReWrite.insert(ChunkLocation.BlockIndex); m_CasLog.Append({.Key = ChunkHash, .Location = ChunkLocation, .Flags = CasDiskIndexEntry::kTombstone}); m_LocationMap.erase(ChunkHash); m_TotalSize.fetch_sub(static_cast(ChunkLocation.Size)); } // TODO: Be smarter about terminating current block - we should probably not rewrite if there is just // a small amount of bytes to gain. if (BlocksToReWrite.contains(m_CurrentBlockIndex)) { uint16_t NewBlockIndex = m_CurrentBlockIndex + 1; while (m_OpenBlocks.contains(NewBlockIndex)) { NewBlockIndex++; if (NewBlockIndex == m_CurrentBlockIndex) { ZEN_ERROR("unable to allocate a new block in {}, count limit {} exeeded", m_ContainerBaseName, std::numeric_limits::max() + 1); return; } } m_CurrentBlockIndex = NewBlockIndex; std::filesystem::path path = BuildUcasPath(m_Config.RootDirectory, m_ContainerBaseName, m_CurrentBlockIndex); auto SmallObjectFile = std::make_shared(); SmallObjectFile->Open(path, true); m_OpenBlocks[m_CurrentBlockIndex] = SmallObjectFile; m_CurrentBlock = SmallObjectFile; m_CurrentInsertOffset = 0; } } // Move all chunks in blocks that have chunks removed to new blocks std::shared_ptr NewBlockFile; uint64_t WriteOffset = {}; uint16_t NewBlockIndex = {}; std::unordered_map MovedBlocks; for (auto BlockIndex : BlocksToReWrite) { auto& ChunkMap = KeepChunks[BlockIndexToKeepChunksMap[BlockIndex]]; if (ChunkMap.empty()) { // The block has no references to it, it should be removed as soon as no references is held on the file // TODO: We currently don't know if someone is holding a IoBuffer for this block at this point! // We want one IoBuffer that owns each block and use that to keep stuff alive using Owning strategy // From that IoBuffer we fetch the file handle // When we are done we dispose that IoBuffer and drop the file handle // Can we create a Sub-IoBuffer from our main buffer even if the size has grown past the initial // size when creating it? RwLock::ExclusiveLockScope _i(m_InsertLock); auto BlockFile = m_OpenBlocks[BlockIndex]; auto FileHandle = BlockFile->Handle(); // m_OpenBlocks.erase(BlockIndex); // BlockFile->Close(); // fs::remove(BlockPath); ZEN_INFO("marking cas store file for delete {}, count limit {} exeeded", m_ContainerBaseName, PathFromHandle(FileHandle)); MarkFileAsDeleteOnClose(FileHandle); continue; } std::shared_ptr BlockFile; { RwLock::ExclusiveLockScope _i(m_InsertLock); BlockFile = m_OpenBlocks[BlockIndex]; } { std::vector Chunk; for (auto& Entry : ChunkMap) { const CasDiskLocation& ChunkLocation = Entry.second; Chunk.resize(ChunkLocation.Size); BlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset); if (!NewBlockFile || (WriteOffset + Chunk.size() > m_MaxBlockSize)) { { RwLock::ExclusiveLockScope _i(m_InsertLock); if (NewBlockFile) { m_OpenBlocks[NewBlockIndex] = NewBlockFile; RwLock::ExclusiveLockScope _l(m_LocationMapLock); for (const auto& MovedEntry : MovedBlocks) { m_LocationMap[MovedEntry.first] = MovedEntry.second; m_CasLog.Append({.Key = MovedEntry.first, .Location = MovedEntry.second}); } } NewBlockIndex = m_CurrentBlockIndex + 1; while (m_OpenBlocks.contains(NewBlockIndex)) { NewBlockIndex++; if (NewBlockIndex == m_CurrentBlockIndex) { ZEN_ERROR("unable to allocate a new block in {}, count limit {} exeeded", m_ContainerBaseName, std::numeric_limits::max() + 1); return; } } m_OpenBlocks[NewBlockIndex] = std::shared_ptr(); // Make sure nobody steals this slot } std::error_code Error; DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Error); if (Error) { ZEN_ERROR("get disk space in {} FAILED, reason '{}'", m_ContainerBaseName, Error.message()); return; } if (Space.Free < (m_MaxBlockSize * 2)) // Never let GC steal the last block space { ZEN_INFO("garbage collect from '{}' FAILED, required disk space {}, free {}", m_Config.RootDirectory / m_ContainerBaseName, m_MaxBlockSize * m_MaxBlockSize, NiceBytes(Space.Free)); RwLock::ExclusiveLockScope _i(m_InsertLock); m_OpenBlocks.erase(NewBlockIndex); return; } std::filesystem::path NewBlockPath = BuildUcasPath(m_Config.RootDirectory, m_ContainerBaseName, NewBlockIndex); NewBlockFile = std::make_shared(); NewBlockFile->Open(NewBlockPath, true); MovedBlocks.clear(); WriteOffset = 0; } NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset); CasDiskLocation NewChunkLocation(NewBlockIndex, gsl::narrow(WriteOffset), gsl::narrow(Chunk.size())); Entry.second = {.BlockIndex = NewBlockIndex, .Offset = gsl::narrow(WriteOffset), .Size = gsl::narrow(Chunk.size())}; MovedBlocks[Entry.first] = Entry.second; WriteOffset = AlignPositon(WriteOffset + Chunk.size(), m_PayloadAlignment); } Chunk.clear(); // Remap moved chunks to the new block file RwLock::ExclusiveLockScope _i(m_InsertLock); if (NewBlockFile) { m_OpenBlocks[NewBlockIndex] = NewBlockFile; RwLock::ExclusiveLockScope _l(m_LocationMapLock); for (const auto& MovedEntry : MovedBlocks) { m_LocationMap[MovedEntry.first] = MovedEntry.second; m_CasLog.Append({.Key = MovedEntry.first, .Location = MovedEntry.second}); } } auto FileHandle = BlockFile->Handle(); // m_OpenBlocks.erase(BlockIndex); // BlockFile->Close(); // fs::remove(BlockPath); ZEN_INFO("marking cas store file for delete {}, count limit {} exeeded", m_ContainerBaseName, PathFromHandle(FileHandle)); MarkFileAsDeleteOnClose(FileHandle); } } GcCtx.DeletedCas(DeletedChunks); ZEN_INFO("garbage collection complete '{}', deleted {} chunks", m_Config.RootDirectory / m_ContainerBaseName, DeletedChunks.size()); MakeIndexSnapshot(); } void CasContainerStrategy::MakeIndexSnapshot() { ZEN_INFO("writing index snapshot for '{}'", m_Config.RootDirectory / m_ContainerBaseName); namespace fs = std::filesystem; fs::path SlogPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ulog"); fs::path SidxPath = m_Config.RootDirectory / (m_ContainerBaseName + ".uidx"); fs::path STmplogPath = m_Config.RootDirectory / (m_ContainerBaseName + ".tmp.ulog"); fs::path STmpSidxPath = m_Config.RootDirectory / (m_ContainerBaseName + ".tmp.uidx"); fs::path SRecoveredlogPath = m_Config.RootDirectory / (m_ContainerBaseName + ".recover.ulog"); // Move cas and index away, we keep them if something goes wrong, any new chunks will be added to the new log { RwLock::ExclusiveLockScope _(m_LocationMapLock); m_CasLog.Close(); if (fs::exists(STmplogPath)) { fs::remove(STmplogPath); } if (fs::exists(STmpSidxPath)) { fs::remove(STmpSidxPath); } fs::rename(SlogPath, STmplogPath); if (fs::exists(SidxPath)) { fs::rename(SidxPath, STmpSidxPath); } // Open an new log m_CasLog.Open(SlogPath, true); } try { // Write the current state of the location map to a new index state std::vector Entries; { RwLock::SharedLockScope _l(m_LocationMapLock); Entries.resize(m_LocationMap.size()); uint64_t EntryIndex = 0; for (auto& Entry : m_LocationMap) { CasDiskIndexEntry& IndexEntry = Entries[EntryIndex++]; IndexEntry.Key = Entry.first; IndexEntry.Location = Entry.second; } } BasicFile SmallObjectIndex; SmallObjectIndex.Open(SidxPath, true); SmallObjectIndex.Write(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), 0); SmallObjectIndex.Close(); } catch (std::exception& Err) { ZEN_ERROR("snapshot FAILED, reason '{}'", Err.what()); // Reconstruct the log from old log and any added log entries RwLock::ExclusiveLockScope _(m_LocationMapLock); if (fs::exists(STmplogPath)) { std::vector Records; Records.reserve(m_LocationMap.size()); { TCasLogFile OldCasLog; OldCasLog.Open(STmplogPath, false); OldCasLog.Replay([&](const CasDiskIndexEntry& Record) { Records.push_back(Record); }); } { m_CasLog.Replay([&](const CasDiskIndexEntry& Record) { Records.push_back(Record); }); } TCasLogFile RecoveredCasLog; RecoveredCasLog.Open(SRecoveredlogPath, true); for (const auto& Record : Records) { RecoveredCasLog.Append(Record); } RecoveredCasLog.Close(); fs::remove(SlogPath); fs::rename(SRecoveredlogPath, SlogPath); fs::remove(STmplogPath); } if (fs::exists(SidxPath)) { fs::remove(SidxPath); } // Restore any previous snapshot if (fs::exists(STmpSidxPath)) { fs::remove(SidxPath); fs::rename(STmpSidxPath, SidxPath); } } if (fs::exists(STmpSidxPath)) { fs::remove(STmpSidxPath); } } void CasContainerStrategy::OpenContainer(bool IsNewStore) { // TODO: Pick up old Cas store format so we can use it in our store std::filesystem::path SlogPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ulog"); m_TotalSize = 0; m_LocationMap.clear(); std::filesystem::path SidxPath = m_Config.RootDirectory / (m_ContainerBaseName + ".uidx"); if (std::filesystem::exists(SidxPath)) { BasicFile SmallObjectIndex; SmallObjectIndex.Open(SidxPath, false); uint64_t Size = SmallObjectIndex.FileSize(); uint64_t EntryCount = Size / sizeof(CasDiskIndexEntry); std::vector Entries{EntryCount}; SmallObjectIndex.Read(Entries.data(), Size, 0); for (const auto& Entry : Entries) { m_LocationMap[Entry.Key] = Entry.Location; } SmallObjectIndex.Close(); } m_CasLog.Open(SlogPath, IsNewStore); m_CasLog.Replay([&](const CasDiskIndexEntry& Record) { if (Record.Flags & CasDiskIndexEntry::kTombstone) { m_LocationMap.erase(Record.Key); } else { m_LocationMap[Record.Key] = Record.Location; } }); std::unordered_set ReferencedBlockIndexes; for (const auto& Entry : m_LocationMap) { const auto& Location = Entry.second; m_TotalSize.fetch_add(Location.Size); ReferencedBlockIndexes.insert(Location.BlockIndex); } uint32_t SmallestBlockSize = 0xffffffffu; for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(m_Config.RootDirectory)) { if (Entry.is_regular_file()) { // TODO: Clean up naming/storage structure so we don't have to do this complicated parsing to find our ucas files if (Entry.path().extension() != ".ucas") { continue; } std::string FileName = Entry.path().stem().string(); if (!FileName.starts_with(m_ContainerBaseName)) { continue; } if (IsNewStore) { std::filesystem::remove(Entry.path()); continue; } try { uint16_t BlockIndex = static_cast(std::stoi(FileName.substr(m_ContainerBaseName.length() + 1))); if (!ReferencedBlockIndexes.contains(BlockIndex)) { // Clear out unused blocks std::filesystem::remove(Entry.path()); continue; } auto SmallObjectFile = std::make_shared(); SmallObjectFile->Open(Entry.path(), false); m_OpenBlocks[BlockIndex] = SmallObjectFile; if (SmallObjectFile->FileSize() < SmallestBlockSize) { m_CurrentBlockIndex = BlockIndex; SmallestBlockSize = gsl::narrow(SmallObjectFile->FileSize()); } } catch (const std::invalid_argument&) { // Non-valid file, skip it (or should we remove it?) } } } if (m_OpenBlocks.empty()) { std::filesystem::path path = BuildUcasPath(m_Config.RootDirectory, m_ContainerBaseName, m_CurrentBlockIndex); auto SmallObjectFile = std::make_shared(); SmallObjectFile->Open(path, true); m_OpenBlocks[m_CurrentBlockIndex] = SmallObjectFile; m_CurrentBlock = SmallObjectFile; m_CurrentInsertOffset = 0; } else { m_CurrentBlock = m_OpenBlocks[m_CurrentBlockIndex]; m_CurrentInsertOffset = static_cast(AlignPositon(m_CurrentBlock.lock()->FileSize(), m_PayloadAlignment)); } // TODO: should validate integrity of container files here } ////////////////////////////////////////////////////////////////////////// #if ZEN_WITH_TESTS namespace { static IoBuffer CreateChunk(uint64_t Size) { static std::random_device rd; static std::mt19937 g(rd()); std::vector Values; Values.resize(Size); for (size_t Idx = 0; Idx < Size; ++Idx) { Values[Idx] = static_cast(Idx); } std::shuffle(Values.begin(), Values.end(), g); return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size()); } } // namespace TEST_CASE("cas.compact.gc") { ScopedTemporaryDirectory TempDir; CasStoreConfiguration CasConfig; CasConfig.RootDirectory = TempDir.Path(); CreateDirectories(CasConfig.RootDirectory); const int kIterationCount = 1000; std::vector Keys(kIterationCount); { CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); Cas.Initialize("test", 65536, 16, true); for (int i = 0; i < kIterationCount; ++i) { CbObjectWriter Cbo; Cbo << "id" << i; CbObject Obj = Cbo.Save(); IoBuffer ObjBuffer = Obj.GetBuffer().AsIoBuffer(); const IoHash Hash = HashBuffer(ObjBuffer); Cas.InsertChunk(ObjBuffer, Hash); Keys[i] = Hash; } for (int i = 0; i < kIterationCount; ++i) { IoBuffer Chunk = Cas.FindChunk(Keys[i]); CHECK(!!Chunk); CbObject Value = LoadCompactBinaryObject(Chunk); CHECK_EQ(Value["id"].AsInt32(), i); } } // Validate that we can still read the inserted data after closing // the original cas store { CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); Cas.Initialize("test", 65536, 16, false); for (int i = 0; i < kIterationCount; ++i) { IoBuffer Chunk = Cas.FindChunk(Keys[i]); CHECK(!!Chunk); CbObject Value = LoadCompactBinaryObject(Chunk); CHECK_EQ(Value["id"].AsInt32(), i); } GcContext Ctx; Cas.CollectGarbage(Ctx); } } TEST_CASE("cas.compact.totalsize") { std::random_device rd; std::mt19937 g(rd()); ScopedTemporaryDirectory TempDir; CasStoreConfiguration CasConfig; CasConfig.RootDirectory = TempDir.Path(); CreateDirectories(CasConfig.RootDirectory); const uint64_t kChunkSize = 1024; const int32_t kChunkCount = 16; { CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); Cas.Initialize("test", 65536, 16, true); for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) { IoBuffer Chunk = CreateChunk(kChunkSize); const IoHash Hash = HashBuffer(Chunk); auto InsertResult = Cas.InsertChunk(Chunk, Hash); ZEN_ASSERT(InsertResult.New); } const uint64_t TotalSize = Cas.StorageSize().DiskSize; CHECK_EQ(kChunkSize * kChunkCount, TotalSize); } { CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); Cas.Initialize("test", 65536, 16, false); const uint64_t TotalSize = Cas.StorageSize().DiskSize; CHECK_EQ(kChunkSize * kChunkCount, TotalSize); } } TEST_CASE("cas.gc.basic") { ScopedTemporaryDirectory TempDir; CasStoreConfiguration CasConfig; CasConfig.RootDirectory = TempDir.Path(); CreateDirectories(CasConfig.RootDirectory); CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); Cas.Initialize("cb", 65536, 1 << 4, true); IoBuffer Chunk = CreateChunk(128); IoHash ChunkHash = IoHash::HashBuffer(Chunk); const auto InsertResult = Cas.InsertChunk(Chunk, ChunkHash); GcContext GcCtx; GcCtx.CollectSmallObjects(true); Cas.CollectGarbage(GcCtx); CHECK(!Cas.HaveChunk(ChunkHash)); } TEST_CASE("cas.gc.compact") { ScopedTemporaryDirectory TempDir; CasStoreConfiguration CasConfig; CasConfig.RootDirectory = TempDir.Path(); CreateDirectories(CasConfig.RootDirectory); CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); Cas.Initialize("cb", 2048, 1 << 4, true); uint64_t ChunkSizes[9] = {128, 541, 1023, 781, 218, 37, 4, 997, 5}; std::vector Chunks; Chunks.reserve(9); for (const auto& Size : ChunkSizes) { Chunks.push_back(CreateChunk(Size)); } std::vector ChunkHashes; ChunkHashes.reserve(9); for (const auto& Chunk : Chunks) { ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); } CHECK(Cas.InsertChunk(Chunks[0], ChunkHashes[0]).New); CHECK(Cas.InsertChunk(Chunks[1], ChunkHashes[1]).New); CHECK(Cas.InsertChunk(Chunks[2], ChunkHashes[2]).New); CHECK(Cas.InsertChunk(Chunks[3], ChunkHashes[3]).New); CHECK(Cas.InsertChunk(Chunks[4], ChunkHashes[4]).New); CHECK(Cas.InsertChunk(Chunks[5], ChunkHashes[5]).New); CHECK(Cas.InsertChunk(Chunks[6], ChunkHashes[6]).New); CHECK(Cas.InsertChunk(Chunks[7], ChunkHashes[7]).New); CHECK(Cas.InsertChunk(Chunks[8], ChunkHashes[8]).New); CHECK(Cas.HaveChunk(ChunkHashes[0])); CHECK(Cas.HaveChunk(ChunkHashes[1])); CHECK(Cas.HaveChunk(ChunkHashes[2])); CHECK(Cas.HaveChunk(ChunkHashes[3])); CHECK(Cas.HaveChunk(ChunkHashes[4])); CHECK(Cas.HaveChunk(ChunkHashes[5])); CHECK(Cas.HaveChunk(ChunkHashes[6])); CHECK(Cas.HaveChunk(ChunkHashes[7])); CHECK(Cas.HaveChunk(ChunkHashes[8])); auto InitialSize = Cas.StorageSize().DiskSize; // Keep first and last { GcContext GcCtx; GcCtx.CollectSmallObjects(true); std::vector KeepChunks; KeepChunks.push_back(ChunkHashes[0]); KeepChunks.push_back(ChunkHashes[8]); GcCtx.ContributeCas(KeepChunks); Cas.CollectGarbage(GcCtx); CHECK(Cas.HaveChunk(ChunkHashes[0])); CHECK(!Cas.HaveChunk(ChunkHashes[1])); CHECK(!Cas.HaveChunk(ChunkHashes[2])); CHECK(!Cas.HaveChunk(ChunkHashes[3])); CHECK(!Cas.HaveChunk(ChunkHashes[4])); CHECK(!Cas.HaveChunk(ChunkHashes[5])); CHECK(!Cas.HaveChunk(ChunkHashes[6])); CHECK(!Cas.HaveChunk(ChunkHashes[7])); CHECK(Cas.HaveChunk(ChunkHashes[8])); CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0]))); CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); } Cas.InsertChunk(Chunks[1], ChunkHashes[1]); Cas.InsertChunk(Chunks[2], ChunkHashes[2]); Cas.InsertChunk(Chunks[3], ChunkHashes[3]); Cas.InsertChunk(Chunks[4], ChunkHashes[4]); Cas.InsertChunk(Chunks[5], ChunkHashes[5]); Cas.InsertChunk(Chunks[6], ChunkHashes[6]); Cas.InsertChunk(Chunks[7], ChunkHashes[7]); // Keep last { GcContext GcCtx; GcCtx.CollectSmallObjects(true); std::vector KeepChunks; KeepChunks.push_back(ChunkHashes[8]); GcCtx.ContributeCas(KeepChunks); Cas.CollectGarbage(GcCtx); CHECK(!Cas.HaveChunk(ChunkHashes[0])); CHECK(!Cas.HaveChunk(ChunkHashes[1])); CHECK(!Cas.HaveChunk(ChunkHashes[2])); CHECK(!Cas.HaveChunk(ChunkHashes[3])); CHECK(!Cas.HaveChunk(ChunkHashes[4])); CHECK(!Cas.HaveChunk(ChunkHashes[5])); CHECK(!Cas.HaveChunk(ChunkHashes[6])); CHECK(!Cas.HaveChunk(ChunkHashes[7])); CHECK(Cas.HaveChunk(ChunkHashes[8])); CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); Cas.InsertChunk(Chunks[1], ChunkHashes[1]); Cas.InsertChunk(Chunks[2], ChunkHashes[2]); Cas.InsertChunk(Chunks[3], ChunkHashes[3]); Cas.InsertChunk(Chunks[4], ChunkHashes[4]); Cas.InsertChunk(Chunks[5], ChunkHashes[5]); Cas.InsertChunk(Chunks[6], ChunkHashes[6]); Cas.InsertChunk(Chunks[7], ChunkHashes[7]); } // Keep mixed { GcContext GcCtx; GcCtx.CollectSmallObjects(true); std::vector KeepChunks; KeepChunks.push_back(ChunkHashes[1]); KeepChunks.push_back(ChunkHashes[4]); KeepChunks.push_back(ChunkHashes[7]); GcCtx.ContributeCas(KeepChunks); Cas.CollectGarbage(GcCtx); CHECK(!Cas.HaveChunk(ChunkHashes[0])); CHECK(Cas.HaveChunk(ChunkHashes[1])); CHECK(!Cas.HaveChunk(ChunkHashes[2])); CHECK(!Cas.HaveChunk(ChunkHashes[3])); CHECK(Cas.HaveChunk(ChunkHashes[4])); CHECK(!Cas.HaveChunk(ChunkHashes[5])); CHECK(!Cas.HaveChunk(ChunkHashes[6])); CHECK(Cas.HaveChunk(ChunkHashes[7])); CHECK(!Cas.HaveChunk(ChunkHashes[8])); CHECK(ChunkHashes[1] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[1]))); CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4]))); CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7]))); Cas.InsertChunk(Chunks[0], ChunkHashes[0]); Cas.InsertChunk(Chunks[2], ChunkHashes[2]); Cas.InsertChunk(Chunks[3], ChunkHashes[3]); Cas.InsertChunk(Chunks[5], ChunkHashes[5]); Cas.InsertChunk(Chunks[6], ChunkHashes[6]); Cas.InsertChunk(Chunks[8], ChunkHashes[8]); } // Keep multiple at end { GcContext GcCtx; GcCtx.CollectSmallObjects(true); std::vector KeepChunks; KeepChunks.push_back(ChunkHashes[6]); KeepChunks.push_back(ChunkHashes[7]); KeepChunks.push_back(ChunkHashes[8]); GcCtx.ContributeCas(KeepChunks); Cas.CollectGarbage(GcCtx); CHECK(!Cas.HaveChunk(ChunkHashes[0])); CHECK(!Cas.HaveChunk(ChunkHashes[1])); CHECK(!Cas.HaveChunk(ChunkHashes[2])); CHECK(!Cas.HaveChunk(ChunkHashes[3])); CHECK(!Cas.HaveChunk(ChunkHashes[4])); CHECK(!Cas.HaveChunk(ChunkHashes[5])); CHECK(Cas.HaveChunk(ChunkHashes[6])); CHECK(Cas.HaveChunk(ChunkHashes[7])); CHECK(Cas.HaveChunk(ChunkHashes[8])); CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6]))); CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7]))); CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); Cas.InsertChunk(Chunks[0], ChunkHashes[0]); Cas.InsertChunk(Chunks[1], ChunkHashes[1]); Cas.InsertChunk(Chunks[2], ChunkHashes[2]); Cas.InsertChunk(Chunks[3], ChunkHashes[3]); Cas.InsertChunk(Chunks[4], ChunkHashes[4]); Cas.InsertChunk(Chunks[5], ChunkHashes[5]); } // Keep every other { GcContext GcCtx; GcCtx.CollectSmallObjects(true); std::vector KeepChunks; KeepChunks.push_back(ChunkHashes[0]); KeepChunks.push_back(ChunkHashes[2]); KeepChunks.push_back(ChunkHashes[4]); KeepChunks.push_back(ChunkHashes[6]); KeepChunks.push_back(ChunkHashes[8]); GcCtx.ContributeCas(KeepChunks); Cas.CollectGarbage(GcCtx); CHECK(Cas.HaveChunk(ChunkHashes[0])); CHECK(!Cas.HaveChunk(ChunkHashes[1])); CHECK(Cas.HaveChunk(ChunkHashes[2])); CHECK(!Cas.HaveChunk(ChunkHashes[3])); CHECK(Cas.HaveChunk(ChunkHashes[4])); CHECK(!Cas.HaveChunk(ChunkHashes[5])); CHECK(Cas.HaveChunk(ChunkHashes[6])); CHECK(!Cas.HaveChunk(ChunkHashes[7])); CHECK(Cas.HaveChunk(ChunkHashes[8])); CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0]))); CHECK(ChunkHashes[2] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[2]))); CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4]))); CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6]))); CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); Cas.InsertChunk(Chunks[1], ChunkHashes[1]); Cas.InsertChunk(Chunks[3], ChunkHashes[3]); Cas.InsertChunk(Chunks[5], ChunkHashes[5]); Cas.InsertChunk(Chunks[7], ChunkHashes[7]); } // Verify that we nicely appended blocks even after all GC operations CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0]))); CHECK(ChunkHashes[1] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[1]))); CHECK(ChunkHashes[2] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[2]))); CHECK(ChunkHashes[3] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[3]))); CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4]))); CHECK(ChunkHashes[5] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[5]))); CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6]))); CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7]))); CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); auto FinalSize = Cas.StorageSize().DiskSize; CHECK(InitialSize == FinalSize); } TEST_CASE("cas.gc.deleteblockonopen") { ScopedTemporaryDirectory TempDir; uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84}; std::vector Chunks; Chunks.reserve(20); for (const auto& Size : ChunkSizes) { Chunks.push_back(CreateChunk(Size)); } std::vector ChunkHashes; ChunkHashes.reserve(20); for (const auto& Chunk : Chunks) { ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); } CasStoreConfiguration CasConfig; CasConfig.RootDirectory = TempDir.Path(); CreateDirectories(CasConfig.RootDirectory); { CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); Cas.Initialize("test", 1024, 16, true); for (size_t i = 0; i < 20; i++) { CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New); } // GC every other block { GcContext GcCtx; GcCtx.CollectSmallObjects(true); std::vector KeepChunks; for (size_t i = 0; i < 20; i += 2) { KeepChunks.push_back(ChunkHashes[i]); } GcCtx.ContributeCas(KeepChunks); Cas.CollectGarbage(GcCtx); for (size_t i = 0; i < 20; i += 2) { CHECK(Cas.HaveChunk(ChunkHashes[i])); CHECK(!Cas.HaveChunk(ChunkHashes[i + 1])); CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i]))); } } } { // Re-open CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); Cas.Initialize("test", 1024, 16, false); for (size_t i = 0; i < 20; i += 2) { CHECK(Cas.HaveChunk(ChunkHashes[i])); CHECK(!Cas.HaveChunk(ChunkHashes[i + 1])); CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i]))); } } } TEST_CASE("cas.gc.handleopeniobuffer") { ScopedTemporaryDirectory TempDir; uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84}; std::vector Chunks; Chunks.reserve(20); for (const auto& Size : ChunkSizes) { Chunks.push_back(CreateChunk(Size)); } std::vector ChunkHashes; ChunkHashes.reserve(20); for (const auto& Chunk : Chunks) { ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); } CasStoreConfiguration CasConfig; CasConfig.RootDirectory = TempDir.Path(); CreateDirectories(CasConfig.RootDirectory); CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); Cas.Initialize("test", 1024, 16, true); for (size_t i = 0; i < 20; i++) { CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New); } auto RetainChunk = Cas.FindChunk(ChunkHashes[5]); // GC everything GcContext GcCtx; GcCtx.CollectSmallObjects(true); Cas.CollectGarbage(GcCtx); for (size_t i = 0; i < 20; i++) { CHECK(!Cas.HaveChunk(ChunkHashes[i])); } CHECK(ChunkHashes[5] == IoHash::HashBuffer(RetainChunk)); } #endif void compactcas_forcelink() { } } // namespace zen