// Copyright Epic Games, Inc. All Rights Reserved. #include "zenstore/cidstore.h" #include #include #include #include #include #include #include #include namespace zen { struct CidStore::Impl { Impl(CasStore& InCasStore) : m_CasStore(InCasStore) {} struct IndexEntry { IoHash Uncompressed; IoHash Compressed; }; CasStore& m_CasStore; TCasLogFile m_LogFile; RwLock m_Lock; tsl::robin_map m_CidMap; CidStore::InsertResult AddChunk(CompressedBuffer& ChunkData) { const IoHash DecompressedId = IoHash::FromBLAKE3(ChunkData.GetRawHash()); IoBuffer Payload = ChunkData.GetCompressed().Flatten().AsIoBuffer(); IoHash CompressedHash = IoHash::HashBuffer(Payload.Data(), Payload.Size()); Payload.SetContentType(ZenContentType::kCompressedBinary); CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, CompressedHash); AddCompressedCid(DecompressedId, CompressedHash); return {.DecompressedId = DecompressedId, .CompressedHash = CompressedHash, .New = Result.New}; } void AddCompressedCid(const IoHash& DecompressedId, const IoHash& Compressed) { ZEN_ASSERT(Compressed != IoHash::Zero); RwLock::ExclusiveLockScope _(m_Lock); auto It = m_CidMap.try_emplace(DecompressedId, Compressed); if (!It.second) { if (It.first.value() != Compressed) { It.first.value() = Compressed; } else { // No point logging an update that won't change anything return; } } // It's not ideal to do this while holding the lock in case // we end up in blocking I/O but that's for later LogMapping(DecompressedId, Compressed); } void LogMapping(const IoHash& DecompressedId, const IoHash& CompressedHash) { ZEN_ASSERT(DecompressedId != CompressedHash); m_LogFile.Append({.Uncompressed = DecompressedId, .Compressed = CompressedHash}); } IoHash RemapCid(const IoHash& DecompressedId) { RwLock::SharedLockScope _(m_Lock); if (auto It = m_CidMap.find(DecompressedId); It != m_CidMap.end()) { return It->second; } return IoHash::Zero; } IoBuffer FindChunkByCid(const IoHash& DecompressedId) { IoHash CompressedHash; { RwLock::SharedLockScope _(m_Lock); if (auto It = m_CidMap.find(DecompressedId); It != m_CidMap.end()) { CompressedHash = It->second; } else { return {}; } } ZEN_ASSERT(CompressedHash != IoHash::Zero); return m_CasStore.FindChunk(CompressedHash); } bool ContainsChunk(const IoHash& DecompressedId) { IoHash CasHash = IoHash::Zero; { RwLock::SharedLockScope _(m_Lock); if (const auto It = m_CidMap.find(DecompressedId); It != m_CidMap.end()) { CasHash = It->second; } } return CasHash != IoHash::Zero ? m_CasStore.ContainsChunk(CasHash) : false; } void InitializeIndex(const std::filesystem::path& RootDir) { CreateDirectories(RootDir); std::filesystem::path SlogPath{RootDir / "cid.slog"}; bool IsNew = !std::filesystem::exists(SlogPath); m_LogFile.Open(SlogPath, IsNew ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite); ZEN_DEBUG("Initializing index from '{}' ({})", SlogPath, NiceBytes(m_LogFile.GetLogSize())); uint64_t TombstoneCount = 0; uint64_t InvalidCount = 0; m_LogFile.Replay( [&](const IndexEntry& Entry) { if (Entry.Compressed != IoHash::Zero) { // Update m_CidMap.insert_or_assign(Entry.Uncompressed, Entry.Compressed); } else { if (Entry.Uncompressed != IoHash::Zero) { // Tombstone m_CidMap.erase(Entry.Uncompressed); ++TombstoneCount; } else { // Completely uninitialized entry with both hashes set to zero indicates a // problem. Might be an unwritten page due to BSOD or some other problem ++InvalidCount; } } }, 0); ZEN_INFO("CID index initialized: {} entries found ({} tombstones, {} invalid)", m_CidMap.size(), TombstoneCount, InvalidCount); } void Flush() { m_LogFile.Flush(); } void Scrub(ScrubContext& Ctx) { if (Ctx.ScrubTimestamp() == m_LastScrubTime) { return; } m_LastScrubTime = Ctx.ScrubTimestamp(); CasChunkSet ChunkSet; { RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_CidMap) { ChunkSet.AddChunkToSet(Kv.second); } } m_CasStore.FilterChunks(ChunkSet); if (ChunkSet.IsEmpty()) { // All good - we have all the chunks return; } ZEN_ERROR("Scrubbing found that {} cid mappings (out of {}) mapped to non-existent CAS chunks. These mappings will be removed", ChunkSet.GetSize(), m_CidMap.size()); // Erase all mappings to chunks which are not present in the underlying CAS store // we do this by removing mappings from the in-memory lookup structure and also // by emitting tombstone records to the commit log std::vector BadChunks; { RwLock::SharedLockScope _(m_Lock); for (auto It = begin(m_CidMap), ItEnd = end(m_CidMap); It != ItEnd;) { if (ChunkSet.ContainsChunk(It->second)) { const IoHash& BadHash = It->first; // Log a tombstone record LogMapping(BadHash, IoHash::Zero); BadChunks.push_back(BadHash); It = m_CidMap.erase(It); } else { ++It; } } } m_LogFile.Flush(); // TODO: Should compute a snapshot index here Ctx.ReportBadCasChunks(BadChunks); } void RemoveCids(CasChunkSet& CasChunks) { std::vector RemovedEntries; RemovedEntries.reserve(CasChunks.GetSize()); { RwLock::ExclusiveLockScope _(m_Lock); for (auto It = m_CidMap.begin(), End = m_CidMap.end(); It != End;) { if (CasChunks.ContainsChunk(It->second)) { RemovedEntries.push_back({It->first, IoHash::Zero}); It = m_CidMap.erase(It); continue; } ++It; } } m_LogFile.Append(RemovedEntries); } uint64_t m_LastScrubTime = 0; }; ////////////////////////////////////////////////////////////////////////// CidStore::CidStore(CasStore& InCasStore, const std::filesystem::path& RootDir) : m_Impl(std::make_unique(InCasStore)) { m_Impl->InitializeIndex(RootDir); } CidStore::~CidStore() { } CidStore::InsertResult CidStore::AddChunk(CompressedBuffer& ChunkData) { return m_Impl->AddChunk(ChunkData); } void CidStore::AddCompressedCid(const IoHash& DecompressedId, const IoHash& Compressed) { m_Impl->AddCompressedCid(DecompressedId, Compressed); } IoBuffer CidStore::FindChunkByCid(const IoHash& DecompressedId) { return m_Impl->FindChunkByCid(DecompressedId); } IoHash CidStore::RemapCid(const IoHash& DecompressedId) { return m_Impl->RemapCid(DecompressedId); } bool CidStore::ContainsChunk(const IoHash& DecompressedId) { return m_Impl->ContainsChunk(DecompressedId); } void CidStore::Flush() { m_Impl->Flush(); } void CidStore::Scrub(ScrubContext& Ctx) { m_Impl->Scrub(Ctx); } void CidStore::RemoveCids(CasChunkSet& CasChunks) { m_Impl->RemoveCids(CasChunks); } CasStoreSize CidStore::CasSize() const { return m_Impl->m_CasStore.TotalSize(); } } // namespace zen