// Copyright Epic Games, Inc. All Rights Reserved. #include #include "CompactCas.h" #include #include #include #include #include #include #include #include ////////////////////////////////////////////////////////////////////////// namespace zen { CasContainerStrategy::CasContainerStrategy(const CasStoreConfiguration& Config) : m_Config(Config) { } CasContainerStrategy::~CasContainerStrategy() { } void CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint64_t Alignment, bool IsNewStore) { ZEN_ASSERT(IsPow2(Alignment)); ZEN_ASSERT(!m_IsInitialized); m_PayloadAlignment = Alignment; std::string BaseName(ContainerBaseName); std::filesystem::path SobsPath = m_Config.RootDirectory / (BaseName + ".ucas"); std::filesystem::path SidxPath = m_Config.RootDirectory / (BaseName + ".uidx"); std::filesystem::path SlogPath = m_Config.RootDirectory / (BaseName + ".ulog"); m_SmallObjectFile.Open(SobsPath, IsNewStore); m_SmallObjectIndex.Open(SidxPath, IsNewStore); m_CasLog.Open(SlogPath, IsNewStore); // TODO: should validate integrity of container files here uint64_t MaxFileOffset = 0; { // This is not technically necessary (nobody should be accessing us from // another thread at this stage) but may help static analysis zen::RwLock::ExclusiveLockScope _(m_LocationMapLock); m_CasLog.Replay([&](const CasDiskIndexEntry& Record) { m_LocationMap[Record.Key] = Record.Location; MaxFileOffset = std::max(MaxFileOffset, Record.Location.Offset + Record.Location.Size); }); } m_CurrentInsertOffset = (MaxFileOffset + m_PayloadAlignment - 1) & ~(m_PayloadAlignment - 1); m_CurrentIndexOffset = m_SmallObjectIndex.FileSize(); m_IsInitialized = true; } CasStore::InsertResult CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash) { { RwLock::SharedLockScope _(m_LocationMapLock); auto KeyIt = m_LocationMap.find(ChunkHash); if (KeyIt != m_LocationMap.end()) { return CasStore::InsertResult{.New = false}; } } // New entry RwLock::ExclusiveLockScope _(m_InsertLock); const uint64_t InsertOffset = m_CurrentInsertOffset; m_SmallObjectFile.Write(ChunkData, ChunkSize, InsertOffset); m_CurrentInsertOffset = (m_CurrentInsertOffset + ChunkSize + m_PayloadAlignment - 1) & ~(m_PayloadAlignment - 1); RwLock::ExclusiveLockScope __(m_LocationMapLock); CasDiskLocation Location{.Offset = InsertOffset, .Size = /* TODO FIX */ uint32_t(ChunkSize)}; m_LocationMap[ChunkHash] = Location; CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = Location}; m_CasLog.Append(IndexEntry); return CasStore::InsertResult{.New = true}; } CasStore::InsertResult CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) { return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash); } IoBuffer CasContainerStrategy::FindChunk(const IoHash& ChunkHash) { RwLock::SharedLockScope _(m_LocationMapLock); if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end()) { const CasDiskLocation& Location = KeyIt->second; return zen::IoBufferBuilder::MakeFromFileHandle(m_SmallObjectFile.Handle(), Location.Offset, Location.Size); } // Not found return IoBuffer(); } bool CasContainerStrategy::HaveChunk(const IoHash& ChunkHash) { RwLock::SharedLockScope _(m_LocationMapLock); if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end()) { return true; } return false; } void CasContainerStrategy::FilterChunks(CasChunkSet& InOutChunks) { // This implementation is good enough for relatively small // chunk sets (in terms of chunk identifiers), but would // benefit from a better implementation which removes // items incrementally for large sets, especially when // we're likely to already have a large proportion of the // chunks in the set std::unordered_set HaveSet; for (const IoHash& Hash : InOutChunks.GetChunkSet()) { if (HaveChunk(Hash)) { HaveSet.insert(Hash); } } for (const IoHash& Hash : HaveSet) { InOutChunks.RemoveIfPresent(Hash); } } void CasContainerStrategy::Flush() { m_CasLog.Flush(); m_SmallObjectIndex.Flush(); m_SmallObjectFile.Flush(); } void CasContainerStrategy::Scrub(ScrubContext& Ctx) { const uint64_t WindowSize = 4 * 1024 * 1024; uint64_t WindowStart = 0; uint64_t WindowEnd = WindowSize; const uint64_t FileSize = m_SmallObjectFile.FileSize(); std::vector BigChunks; std::vector BadChunks; // We do a read sweep through the payloads file and validate // any entries that are contained within each segment, with // the assumption that most entries will be checked in this // pass. An alternative strategy would be to use memory mapping. { IoBuffer ReadBuffer{WindowSize}; void* BufferBase = ReadBuffer.MutableData(); RwLock::SharedLockScope _(m_LocationMapLock); do { const uint64_t ChunkSize = zen::Min(WindowSize, FileSize - WindowStart); m_SmallObjectFile.Read(BufferBase, ChunkSize, WindowStart); for (auto& Entry : m_LocationMap) { const uint64_t EntryOffset = Entry.second.Offset; if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd)) { const uint64_t EntryEnd = EntryOffset + Entry.second.Size; if (EntryEnd >= WindowEnd) { BigChunks.push_back({.Key = Entry.first, .Location = Entry.second}); continue; } const IoHash ComputedHash = IoHash::HashBuffer(BufferBase, Entry.second.Size); if (Entry.first != ComputedHash) { // Hash mismatch BadChunks.push_back({.Key = Entry.first, .Location = Entry.second}); } } } WindowStart += WindowSize; WindowEnd += WindowSize; } while (WindowStart < FileSize); } // Deal with large chunks for (const CasDiskIndexEntry& Entry : BigChunks) { IoHashStream Hasher; m_SmallObjectFile.StreamByteRange(Entry.Location.Offset, Entry.Location.Size, [&](const void* Data, uint64_t Size) { Hasher.Append(Data, Size); }); IoHash ComputedHash = Hasher.GetHash(); if (Entry.Key != ComputedHash) { BadChunks.push_back(Entry); } } // Deal with bad chunks by removing them from our lookup map std::vector BadChunkHashes; for (const CasDiskIndexEntry& Entry : BadChunks) { BadChunkHashes.push_back(Entry.Key); m_LocationMap.erase(Entry.Key); } // Let whomever it concerns know about the bad chunks. This could // be used to invalidate higher level data structures more efficiently // than a full validation pass might be able to do Ctx.ReportBadChunks(BadChunkHashes); } void CasContainerStrategy::MakeSnapshot() { RwLock::SharedLockScope _(m_LocationMapLock); std::vector Entries{m_LocationMap.size()}; uint64_t EntryIndex = 0; for (auto& Entry : m_LocationMap) { CasDiskIndexEntry& IndexEntry = Entries[EntryIndex++]; IndexEntry.Key = Entry.first; IndexEntry.Location = Entry.second; } m_SmallObjectIndex.Write(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), 0); } } // namespace zen