// Copyright Epic Games, Inc. All Rights Reserved. #include "zenstore/cidstore.h" #include #include #include #include #include #include namespace zen { struct CidStore::CidState { CidState(CasStore& InCasStore) : m_CasStore(InCasStore) {} struct IndexEntry { IoHash Uncompressed; IoHash Compressed; }; CasStore& m_CasStore; TCasLogFile m_LogFile; RwLock m_Lock; tsl::robin_map m_CidMap; CidStore::InsertResult AddChunk(CompressedBuffer& ChunkData) { const IoHash DecompressedId = IoHash::FromBLAKE3(ChunkData.GetRawHash()); IoBuffer Payload = ChunkData.GetCompressed().Flatten().AsIoBuffer(); IoHash CompressedHash = IoHash::HashBuffer(Payload.Data(), Payload.Size()); CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, CompressedHash); AddCompressedCid(DecompressedId, CompressedHash); return {.DecompressedId = DecompressedId, .CompressedHash = CompressedHash, .New = Result.New}; } void AddCompressedCid(const IoHash& DecompressedId, const IoHash& Compressed) { RwLock::ExclusiveLockScope _(m_Lock); m_CidMap.insert_or_assign(DecompressedId, Compressed); // TODO: it's pretty wasteful to log even idempotent updates // however we can't simply use the boolean returned by insert_or_assign // since there's not a 1:1 mapping between compressed and uncompressed // so if we want a last-write-wins policy then we have to log each update m_LogFile.Append({.Uncompressed = DecompressedId, .Compressed = Compressed}); } IoBuffer FindChunkByCid(const IoHash& DecompressedId) { IoHash CompressedHash; { RwLock::SharedLockScope _(m_Lock); if (auto It = m_CidMap.find(DecompressedId); It != m_CidMap.end()) { CompressedHash = It->second; } } if (CompressedHash != IoHash::Zero) { return m_CasStore.FindChunk(CompressedHash); } return IoBuffer(); } bool ContainsChunk(const IoHash& DecompressedId) { RwLock::SharedLockScope _(m_Lock); return m_CidMap.find(DecompressedId) != m_CidMap.end(); } void InitializeIndex(const std::filesystem::path& RootDir) { zen::CreateDirectories(RootDir); std::filesystem::path SlogPath{RootDir / "cid.slog"}; bool IsNew = !std::filesystem::exists(SlogPath); m_LogFile.Open(SlogPath, IsNew); m_LogFile.Replay([&](const IndexEntry& Ie) { m_CidMap.insert_or_assign(Ie.Uncompressed, Ie.Compressed); }); ZEN_DEBUG("CID index initialized: {} entries found", m_CidMap.size()); } void Flush() { m_LogFile.Flush(); } }; ////////////////////////////////////////////////////////////////////////// CidStore::CidStore(CasStore& InCasStore, const std::filesystem::path& RootDir) : m_Impl(std::make_unique(InCasStore)) { m_Impl->InitializeIndex(RootDir); } CidStore::~CidStore() { } CidStore::InsertResult CidStore::AddChunk(CompressedBuffer& ChunkData) { return m_Impl->AddChunk(ChunkData); } void CidStore::AddCompressedCid(const IoHash& DecompressedId, const IoHash& Compressed) { m_Impl->AddCompressedCid(DecompressedId, Compressed); } IoBuffer CidStore::FindChunkByCid(const IoHash& DecompressedId) { return m_Impl->FindChunkByCid(DecompressedId); } bool CidStore::ContainsChunk(const IoHash& DecompressedId) { return m_Impl->ContainsChunk(DecompressedId); } void CidStore::Flush() { m_Impl->Flush(); } } // namespace zen