// Copyright Epic Games, Inc. All Rights Reserved. #include #include namespace zen { MemoryCidStore::MemoryCidStore(CidStore* BackingStore) : m_BackingStore(BackingStore) { if (m_BackingStore) { m_FlushThreadEnabled = true; m_FlushThread = std::thread(&MemoryCidStore::FlushThreadFunction, this); } } MemoryCidStore::~MemoryCidStore() { m_FlushThreadEnabled = false; m_FlushEvent.Set(); if (m_FlushThread.joinable()) { m_FlushThread.join(); } } MemoryCidStore::InsertResult MemoryCidStore::AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash) { bool IsNew = false; m_Lock.WithExclusiveLock([&] { auto [It, Inserted] = m_Chunks.try_emplace(RawHash, ChunkData); IsNew = Inserted; }); if (m_BackingStore) { std::lock_guard Lock(m_FlushLock); m_FlushQueue.push_back({.Data = ChunkData, .Hash = RawHash}); m_FlushEvent.Set(); } return {.New = IsNew}; } std::vector MemoryCidStore::AddChunks(std::span ChunkDatas, std::span RawHashes) { std::vector Results; Results.reserve(ChunkDatas.size()); for (size_t i = 0; i < ChunkDatas.size(); ++i) { Results.push_back(AddChunk(ChunkDatas[i], RawHashes[i])); } return Results; } IoBuffer MemoryCidStore::FindChunkByCid(const IoHash& DecompressedId) { IoBuffer Result; m_Lock.WithSharedLock([&] { auto It = m_Chunks.find(DecompressedId); if (It != m_Chunks.end()) { Result = It->second; } }); if (!Result && m_BackingStore) { Result = m_BackingStore->FindChunkByCid(DecompressedId); } return Result; } bool MemoryCidStore::ContainsChunk(const IoHash& DecompressedId) { bool Found = false; m_Lock.WithSharedLock([&] { Found = m_Chunks.find(DecompressedId) != m_Chunks.end(); }); if (!Found && m_BackingStore) { Found = m_BackingStore->ContainsChunk(DecompressedId); } return Found; } void MemoryCidStore::FilterChunks(HashKeySet& InOutChunks) { // Remove hashes that are present in our memory store m_Lock.WithSharedLock([&] { InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return m_Chunks.find(Hash) != m_Chunks.end(); }); }); // Delegate remainder to backing store if (m_BackingStore && !InOutChunks.IsEmpty()) { m_BackingStore->FilterChunks(InOutChunks); } } void MemoryCidStore::FlushThreadFunction() { SetCurrentThreadName("MemCidFlush"); while (m_FlushThreadEnabled) { m_FlushEvent.Wait(); std::deque Batch; { std::lock_guard Lock(m_FlushLock); Batch.swap(m_FlushQueue); } for (PendingWrite& Write : Batch) { m_BackingStore->AddChunk(Write.Data, Write.Hash); } } // Drain remaining writes on shutdown std::deque Remaining; { std::lock_guard Lock(m_FlushLock); Remaining.swap(m_FlushQueue); } for (PendingWrite& Write : Remaining) { m_BackingStore->AddChunk(Write.Data, Write.Hash); } } } // namespace zen