// Copyright Epic Games, Inc. All Rights Reserved. #include #include "compactcas.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #if ZEN_WITH_TESTS # include # include #endif ////////////////////////////////////////////////////////////////////////// namespace zen { CasContainerStrategy::CasContainerStrategy(const CasStoreConfiguration& Config, CasGc& Gc) : GcStorage(Gc) , m_Config(Config) , m_Log(logging::Get("containercas")) { } CasContainerStrategy::~CasContainerStrategy() { } void CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint64_t Alignment, bool IsNewStore) { ZEN_ASSERT(IsPow2(Alignment)); ZEN_ASSERT(!m_IsInitialized); m_ContainerBaseName = ContainerBaseName; m_PayloadAlignment = Alignment; OpenContainer(IsNewStore); m_IsInitialized = true; } CasStore::InsertResult CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash) { { RwLock::SharedLockScope _(m_LocationMapLock); auto KeyIt = m_LocationMap.find(ChunkHash); if (KeyIt != m_LocationMap.end()) { return CasStore::InsertResult{.New = false}; } } // New entry RwLock::ExclusiveLockScope _(m_InsertLock); const uint64_t InsertOffset = m_CurrentInsertOffset; m_SmallObjectFile.Write(ChunkData, ChunkSize, InsertOffset); m_CurrentInsertOffset = (m_CurrentInsertOffset + ChunkSize + m_PayloadAlignment - 1) & ~(m_PayloadAlignment - 1); RwLock::ExclusiveLockScope __(m_LocationMapLock); const CasDiskLocation Location{InsertOffset, ChunkSize}; m_LocationMap[ChunkHash] = Location; CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = Location}; m_TotalSize.fetch_add(static_cast(ChunkSize)); m_CasLog.Append(IndexEntry); return CasStore::InsertResult{.New = true}; } CasStore::InsertResult CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) { return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash); } IoBuffer CasContainerStrategy::FindChunk(const IoHash& ChunkHash) { RwLock::SharedLockScope _(m_LocationMapLock); if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end()) { const CasDiskLocation& Location = KeyIt->second; return IoBufferBuilder::MakeFromFileHandle(m_SmallObjectFile.Handle(), Location.GetOffset(), Location.GetSize()); } // Not found return IoBuffer(); } bool CasContainerStrategy::HaveChunk(const IoHash& ChunkHash) { RwLock::SharedLockScope _(m_LocationMapLock); if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end()) { return true; } return false; } void CasContainerStrategy::FilterChunks(CasChunkSet& InOutChunks) { // This implementation is good enough for relatively small // chunk sets (in terms of chunk identifiers), but would // benefit from a better implementation which removes // items incrementally for large sets, especially when // we're likely to already have a large proportion of the // chunks in the set InOutChunks.RemoveChunksIf([&](const IoHash& Hash) { return HaveChunk(Hash); }); } void CasContainerStrategy::Flush() { m_CasLog.Flush(); m_SmallObjectIndex.Flush(); m_SmallObjectFile.Flush(); } void CasContainerStrategy::Scrub(ScrubContext& Ctx) { const uint64_t WindowSize = 4 * 1024 * 1024; uint64_t WindowStart = 0; uint64_t WindowEnd = WindowSize; const uint64_t FileSize = m_SmallObjectFile.FileSize(); std::vector BigChunks; std::vector BadChunks; // We do a read sweep through the payloads file and validate // any entries that are contained within each segment, with // the assumption that most entries will be checked in this // pass. An alternative strategy would be to use memory mapping. { IoBuffer ReadBuffer{WindowSize}; void* BufferBase = ReadBuffer.MutableData(); RwLock::SharedLockScope _(m_LocationMapLock); do { const uint64_t ChunkSize = Min(WindowSize, FileSize - WindowStart); m_SmallObjectFile.Read(BufferBase, ChunkSize, WindowStart); for (auto& Entry : m_LocationMap) { const uint64_t EntryOffset = Entry.second.GetOffset(); if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd)) { const uint64_t EntryEnd = EntryOffset + Entry.second.GetSize(); if (EntryEnd >= WindowEnd) { BigChunks.push_back({.Key = Entry.first, .Location = Entry.second}); continue; } const IoHash ComputedHash = IoHash::HashBuffer(reinterpret_cast(BufferBase) + Entry.second.GetOffset() - WindowStart, Entry.second.GetSize()); if (Entry.first != ComputedHash) { // Hash mismatch BadChunks.push_back({.Key = Entry.first, .Location = Entry.second}); } } } WindowStart += WindowSize; WindowEnd += WindowSize; } while (WindowStart < FileSize); } // Deal with large chunks for (const CasDiskIndexEntry& Entry : BigChunks) { IoHashStream Hasher; m_SmallObjectFile.StreamByteRange(Entry.Location.GetOffset(), Entry.Location.GetSize(), [&](const void* Data, uint64_t Size) { Hasher.Append(Data, Size); }); IoHash ComputedHash = Hasher.GetHash(); if (Entry.Key != ComputedHash) { BadChunks.push_back(Entry); } } if (BadChunks.empty()) { return; } ZEN_ERROR("Scrubbing found {} bad chunks in '{}'", BadChunks.size(), m_ContainerBaseName); // Deal with bad chunks by removing them from our lookup map std::vector BadChunkHashes; for (const CasDiskIndexEntry& Entry : BadChunks) { BadChunkHashes.push_back(Entry.Key); m_CasLog.Append({.Key = Entry.Key, .Location = Entry.Location, .Flags = CasDiskIndexEntry::kTombstone}); m_LocationMap.erase(Entry.Key); } // Let whomever it concerns know about the bad chunks. This could // be used to invalidate higher level data structures more efficiently // than a full validation pass might be able to do Ctx.ReportBadCasChunks(BadChunkHashes); } void CasContainerStrategy::CollectGarbage(GcContext& GcCtx) { namespace fs = std::filesystem; // A naive garbage collection implementation that just copies evicted chunks // into a new container file. We probably need to partition the container file // into several parts to prevent needing to keep the entire container file during GC. ZEN_INFO("collecting garbage from '{}'", m_Config.RootDirectory / m_ContainerBaseName); RwLock::ExclusiveLockScope _(m_LocationMapLock); Flush(); std::vector Candidates; std::vector ChunksToKeep; std::vector ChunksToDelete; const uint64_t ChunkCount = m_LocationMap.size(); uint64_t TotalSize{}; Candidates.reserve(m_LocationMap.size()); for (auto& Entry : m_LocationMap) { Candidates.push_back(Entry.first); TotalSize += Entry.second.GetSize(); } ChunksToKeep.reserve(Candidates.size()); GcCtx.FilterCas(Candidates, [&ChunksToKeep, &ChunksToDelete](const IoHash& Hash, bool Keep) { if (Keep) { ChunksToKeep.push_back(Hash); } else { ChunksToDelete.push_back(Hash); } }); if (m_LocationMap.empty() || ChunksToKeep.size() == m_LocationMap.size()) { ZEN_INFO("garbage collect DONE, scanned #{} {} chunks from '{}', nothing to delete", ChunkCount, NiceBytes(TotalSize), m_Config.RootDirectory / m_ContainerBaseName); return; } const uint64_t NewChunkCount = ChunksToKeep.size(); uint64_t NewTotalSize = 0; for (const IoHash& Key : ChunksToKeep) { const CasDiskLocation& Loc = m_LocationMap[Key]; NewTotalSize += Loc.GetSize(); } std::error_code Error; DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Error); if (Error) { ZEN_ERROR("get disk space FAILED, reason '{}'", Error.message()); return; } if (Space.Free < NewTotalSize + (64 << 20)) { ZEN_INFO("garbage collect from '{}' FAILED, required disk space {}, free {}", m_Config.RootDirectory / m_ContainerBaseName, NiceBytes(NewTotalSize), NiceBytes(Space.Free)); return; } const bool CollectSmallObjects = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); if (!CollectSmallObjects) { ZEN_INFO("garbage collect from '{}' DISABLED, found #{} {} chunks of total #{} {}", m_Config.RootDirectory / m_ContainerBaseName, ChunkCount - NewChunkCount, NiceBytes(TotalSize - NewTotalSize), ChunkCount, NiceBytes(TotalSize)); return; } fs::path TmpSobsPath = m_Config.RootDirectory / (m_ContainerBaseName + ".gc.ucas"); fs::path TmpSlogPath = m_Config.RootDirectory / (m_ContainerBaseName + ".gc.ulog"); { ZEN_DEBUG("creating temporary container cas '{}'...", TmpSobsPath); TCasLogFile TmpLog; BasicFile TmpObjectFile; bool IsNew = true; TmpLog.Open(TmpSlogPath, IsNew); TmpObjectFile.Open(TmpSobsPath, IsNew); std::vector Chunk; uint64_t NextInsertOffset{}; for (const IoHash& Key : ChunksToKeep) { const auto Entry = m_LocationMap.find(Key); const auto& Loc = Entry->second; Chunk.resize(Loc.GetSize()); m_SmallObjectFile.Read(Chunk.data(), Chunk.size(), Loc.GetOffset()); const uint64_t InsertOffset = NextInsertOffset; TmpObjectFile.Write(Chunk.data(), Chunk.size(), InsertOffset); TmpLog.Append({.Key = Key, .Location = {InsertOffset, Chunk.size()}}); NextInsertOffset = (NextInsertOffset + Chunk.size() + m_PayloadAlignment - 1) & ~(m_PayloadAlignment - 1); } } try { CloseContainer(); fs::path SobsPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ucas"); fs::path SidxPath = m_Config.RootDirectory / (m_ContainerBaseName + ".uidx"); fs::path SlogPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ulog"); fs::remove(SobsPath); fs::remove(SidxPath); fs::remove(SlogPath); fs::rename(TmpSobsPath, SobsPath); fs::rename(TmpSlogPath, SlogPath); { // Create a new empty index file BasicFile SidxFile; SidxFile.Open(SidxPath, true); } OpenContainer(false /* IsNewStore */); GcCtx.DeletedCas(ChunksToDelete); ZEN_INFO("garbage collect from '{}' DONE, collected #{} {} chunks of total #{} {}", m_Config.RootDirectory / m_ContainerBaseName, ChunkCount - NewChunkCount, NiceBytes(TotalSize - NewTotalSize), ChunkCount, NiceBytes(TotalSize)); } catch (std::exception& Err) { ZEN_ERROR("garbage collection FAILED, reason '{}'", Err.what()); // Something went wrong, try create a new container OpenContainer(true /* IsNewStore */); GcCtx.DeletedCas(ChunksToDelete); GcCtx.DeletedCas(ChunksToKeep); } } void CasContainerStrategy::MakeSnapshot() { RwLock::SharedLockScope _(m_LocationMapLock); std::vector Entries{m_LocationMap.size()}; uint64_t EntryIndex = 0; for (auto& Entry : m_LocationMap) { CasDiskIndexEntry& IndexEntry = Entries[EntryIndex++]; IndexEntry.Key = Entry.first; IndexEntry.Location = Entry.second; } m_SmallObjectIndex.Write(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), 0); } void CasContainerStrategy::OpenContainer(bool IsNewStore) { std::filesystem::path SobsPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ucas"); std::filesystem::path SidxPath = m_Config.RootDirectory / (m_ContainerBaseName + ".uidx"); std::filesystem::path SlogPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ulog"); m_SmallObjectFile.Open(SobsPath, IsNewStore); m_SmallObjectIndex.Open(SidxPath, IsNewStore); m_CasLog.Open(SlogPath, IsNewStore); // TODO: should validate integrity of container files here m_CurrentInsertOffset = 0; m_CurrentIndexOffset = 0; m_TotalSize = 0; m_LocationMap.clear(); uint64_t MaxFileOffset = 0; m_CasLog.Replay([&](const CasDiskIndexEntry& Record) { if (Record.Flags & CasDiskIndexEntry::kTombstone) { m_TotalSize.fetch_sub(Record.Location.GetSize()); } else { m_TotalSize.fetch_add(Record.Location.GetSize()); m_LocationMap[Record.Key] = Record.Location; MaxFileOffset = std::max(MaxFileOffset, Record.Location.GetOffset() + Record.Location.GetSize()); } }); m_CurrentInsertOffset = (MaxFileOffset + m_PayloadAlignment - 1) & ~(m_PayloadAlignment - 1); m_CurrentIndexOffset = m_SmallObjectIndex.FileSize(); } void CasContainerStrategy::CloseContainer() { m_SmallObjectFile.Close(); m_SmallObjectIndex.Close(); m_CasLog.Close(); } ////////////////////////////////////////////////////////////////////////// #if ZEN_WITH_TESTS TEST_CASE("cas.compact.gc") { ScopedTemporaryDirectory TempDir; CasStoreConfiguration CasConfig; CasConfig.RootDirectory = TempDir.Path(); CreateDirectories(CasConfig.RootDirectory); const int kIterationCount = 1000; std::vector Keys(kIterationCount); { CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); Cas.Initialize("test", 16, true); for (int i = 0; i < kIterationCount; ++i) { CbObjectWriter Cbo; Cbo << "id" << i; CbObject Obj = Cbo.Save(); IoBuffer ObjBuffer = Obj.GetBuffer().AsIoBuffer(); const IoHash Hash = HashBuffer(ObjBuffer); Cas.InsertChunk(ObjBuffer, Hash); Keys[i] = Hash; } for (int i = 0; i < kIterationCount; ++i) { IoBuffer Chunk = Cas.FindChunk(Keys[i]); CHECK(!!Chunk); CbObject Value = LoadCompactBinaryObject(Chunk); CHECK_EQ(Value["id"].AsInt32(), i); } } // Validate that we can still read the inserted data after closing // the original cas store { CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); Cas.Initialize("test", 16, false); for (int i = 0; i < kIterationCount; ++i) { IoBuffer Chunk = Cas.FindChunk(Keys[i]); CHECK(!!Chunk); CbObject Value = LoadCompactBinaryObject(Chunk); CHECK_EQ(Value["id"].AsInt32(), i); } GcContext Ctx; Cas.CollectGarbage(Ctx); } } TEST_CASE("cas.compact.totalsize") { std::random_device rd; std::mt19937 g(rd()); const auto CreateChunk = [&](uint64_t Size) -> IoBuffer { const size_t Count = static_cast(Size / sizeof(uint32_t)); std::vector Values; Values.resize(Count); for (size_t Idx = 0; Idx < Count; ++Idx) { Values[Idx] = static_cast(Idx); } std::shuffle(Values.begin(), Values.end(), g); return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size() * sizeof(uint32_t)); }; ScopedTemporaryDirectory TempDir; CasStoreConfiguration CasConfig; CasConfig.RootDirectory = TempDir.Path(); CreateDirectories(CasConfig.RootDirectory); const uint64_t kChunkSize = 1024; const int32_t kChunkCount = 16; { CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); Cas.Initialize("test", 16, true); for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) { IoBuffer Chunk = CreateChunk(kChunkSize); const IoHash Hash = HashBuffer(Chunk); auto InsertResult = Cas.InsertChunk(Chunk, Hash); ZEN_ASSERT(InsertResult.New); } const uint64_t TotalSize = Cas.StorageSize().DiskSize; CHECK_EQ(kChunkSize * kChunkCount, TotalSize); } { CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); Cas.Initialize("test", 16, false); const uint64_t TotalSize = Cas.StorageSize().DiskSize; CHECK_EQ(kChunkSize * kChunkCount, TotalSize); } } #endif void compactcas_forcelink() { } } // namespace zen