diff options
Diffstat (limited to 'src/zenstore/memorycidstore.cpp')
| -rw-r--r-- | src/zenstore/memorycidstore.cpp | 143 |
1 files changed, 143 insertions, 0 deletions
diff --git a/src/zenstore/memorycidstore.cpp b/src/zenstore/memorycidstore.cpp new file mode 100644 index 000000000..b4832029b --- /dev/null +++ b/src/zenstore/memorycidstore.cpp @@ -0,0 +1,143 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenstore/hashkeyset.h> +#include <zenstore/memorycidstore.h> + +namespace zen { + +MemoryCidStore::MemoryCidStore(CidStore* BackingStore) : m_BackingStore(BackingStore) +{ + if (m_BackingStore) + { + m_FlushThreadEnabled = true; + m_FlushThread = std::thread(&MemoryCidStore::FlushThreadFunction, this); + } +} + +MemoryCidStore::~MemoryCidStore() +{ + m_FlushThreadEnabled = false; + m_FlushEvent.Set(); + if (m_FlushThread.joinable()) + { + m_FlushThread.join(); + } +} + +MemoryCidStore::InsertResult +MemoryCidStore::AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash) +{ + bool IsNew = false; + + m_Lock.WithExclusiveLock([&] { + auto [It, Inserted] = m_Chunks.try_emplace(RawHash, ChunkData); + IsNew = Inserted; + }); + + if (m_BackingStore) + { + std::lock_guard<std::mutex> Lock(m_FlushLock); + m_FlushQueue.push_back({.Data = ChunkData, .Hash = RawHash}); + m_FlushEvent.Set(); + } + + return {.New = IsNew}; +} + +std::vector<MemoryCidStore::InsertResult> +MemoryCidStore::AddChunks(std::span<IoBuffer> ChunkDatas, std::span<IoHash> RawHashes) +{ + std::vector<MemoryCidStore::InsertResult> Results; + Results.reserve(ChunkDatas.size()); + + for (size_t i = 0; i < ChunkDatas.size(); ++i) + { + Results.push_back(AddChunk(ChunkDatas[i], RawHashes[i])); + } + + return Results; +} + +IoBuffer +MemoryCidStore::FindChunkByCid(const IoHash& DecompressedId) +{ + IoBuffer Result; + + m_Lock.WithSharedLock([&] { + auto It = m_Chunks.find(DecompressedId); + if (It != m_Chunks.end()) + { + Result = It->second; + } + }); + + if (!Result && m_BackingStore) + { + Result = m_BackingStore->FindChunkByCid(DecompressedId); + } + + return Result; +} + +bool +MemoryCidStore::ContainsChunk(const IoHash& DecompressedId) +{ + bool Found = false; + + m_Lock.WithSharedLock([&] { Found = m_Chunks.find(DecompressedId) != m_Chunks.end(); }); + + if (!Found && m_BackingStore) + { + Found = m_BackingStore->ContainsChunk(DecompressedId); + } + + return Found; +} + +void +MemoryCidStore::FilterChunks(HashKeySet& InOutChunks) +{ + // Remove hashes that are present in our memory store + m_Lock.WithSharedLock([&] { InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return m_Chunks.find(Hash) != m_Chunks.end(); }); }); + + // Delegate remainder to backing store + if (m_BackingStore && !InOutChunks.IsEmpty()) + { + m_BackingStore->FilterChunks(InOutChunks); + } +} + +void +MemoryCidStore::FlushThreadFunction() +{ + SetCurrentThreadName("MemCidFlush"); + + while (m_FlushThreadEnabled) + { + m_FlushEvent.Wait(); + + std::vector<PendingWrite> Batch; + { + std::lock_guard<std::mutex> Lock(m_FlushLock); + Batch.swap(m_FlushQueue); + } + + for (PendingWrite& Write : Batch) + { + m_BackingStore->AddChunk(Write.Data, Write.Hash); + } + } + + // Drain remaining writes on shutdown + std::vector<PendingWrite> Remaining; + { + std::lock_guard<std::mutex> Lock(m_FlushLock); + Remaining.swap(m_FlushQueue); + } + for (PendingWrite& Write : Remaining) + { + m_BackingStore->AddChunk(Write.Data, Write.Hash); + } +} + +} // namespace zen |