aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/memorycidstore.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenstore/memorycidstore.cpp')
-rw-r--r--src/zenstore/memorycidstore.cpp143
1 files changed, 143 insertions, 0 deletions
diff --git a/src/zenstore/memorycidstore.cpp b/src/zenstore/memorycidstore.cpp
new file mode 100644
index 000000000..b4832029b
--- /dev/null
+++ b/src/zenstore/memorycidstore.cpp
@@ -0,0 +1,143 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenstore/hashkeyset.h>
+#include <zenstore/memorycidstore.h>
+
+namespace zen {
+
+MemoryCidStore::MemoryCidStore(CidStore* BackingStore) : m_BackingStore(BackingStore)
+{
+ if (m_BackingStore)
+ {
+ m_FlushThreadEnabled = true;
+ m_FlushThread = std::thread(&MemoryCidStore::FlushThreadFunction, this);
+ }
+}
+
+MemoryCidStore::~MemoryCidStore()
+{
+ m_FlushThreadEnabled = false;
+ m_FlushEvent.Set();
+ if (m_FlushThread.joinable())
+ {
+ m_FlushThread.join();
+ }
+}
+
+MemoryCidStore::InsertResult
+MemoryCidStore::AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash)
+{
+ bool IsNew = false;
+
+ m_Lock.WithExclusiveLock([&] {
+ auto [It, Inserted] = m_Chunks.try_emplace(RawHash, ChunkData);
+ IsNew = Inserted;
+ });
+
+ if (m_BackingStore)
+ {
+ std::lock_guard<std::mutex> Lock(m_FlushLock);
+ m_FlushQueue.push_back({.Data = ChunkData, .Hash = RawHash});
+ m_FlushEvent.Set();
+ }
+
+ return {.New = IsNew};
+}
+
+std::vector<MemoryCidStore::InsertResult>
+MemoryCidStore::AddChunks(std::span<IoBuffer> ChunkDatas, std::span<IoHash> RawHashes)
+{
+ std::vector<MemoryCidStore::InsertResult> Results;
+ Results.reserve(ChunkDatas.size());
+
+ for (size_t i = 0; i < ChunkDatas.size(); ++i)
+ {
+ Results.push_back(AddChunk(ChunkDatas[i], RawHashes[i]));
+ }
+
+ return Results;
+}
+
+IoBuffer
+MemoryCidStore::FindChunkByCid(const IoHash& DecompressedId)
+{
+ IoBuffer Result;
+
+ m_Lock.WithSharedLock([&] {
+ auto It = m_Chunks.find(DecompressedId);
+ if (It != m_Chunks.end())
+ {
+ Result = It->second;
+ }
+ });
+
+ if (!Result && m_BackingStore)
+ {
+ Result = m_BackingStore->FindChunkByCid(DecompressedId);
+ }
+
+ return Result;
+}
+
+bool
+MemoryCidStore::ContainsChunk(const IoHash& DecompressedId)
+{
+ bool Found = false;
+
+ m_Lock.WithSharedLock([&] { Found = m_Chunks.find(DecompressedId) != m_Chunks.end(); });
+
+ if (!Found && m_BackingStore)
+ {
+ Found = m_BackingStore->ContainsChunk(DecompressedId);
+ }
+
+ return Found;
+}
+
+void
+MemoryCidStore::FilterChunks(HashKeySet& InOutChunks)
+{
+ // Remove hashes that are present in our memory store
+ m_Lock.WithSharedLock([&] { InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return m_Chunks.find(Hash) != m_Chunks.end(); }); });
+
+ // Delegate remainder to backing store
+ if (m_BackingStore && !InOutChunks.IsEmpty())
+ {
+ m_BackingStore->FilterChunks(InOutChunks);
+ }
+}
+
+void
+MemoryCidStore::FlushThreadFunction()
+{
+ SetCurrentThreadName("MemCidFlush");
+
+ while (m_FlushThreadEnabled)
+ {
+ m_FlushEvent.Wait();
+
+ std::vector<PendingWrite> Batch;
+ {
+ std::lock_guard<std::mutex> Lock(m_FlushLock);
+ Batch.swap(m_FlushQueue);
+ }
+
+ for (PendingWrite& Write : Batch)
+ {
+ m_BackingStore->AddChunk(Write.Data, Write.Hash);
+ }
+ }
+
+ // Drain remaining writes on shutdown
+ std::vector<PendingWrite> Remaining;
+ {
+ std::lock_guard<std::mutex> Lock(m_FlushLock);
+ Remaining.swap(m_FlushQueue);
+ }
+ for (PendingWrite& Write : Remaining)
+ {
+ m_BackingStore->AddChunk(Write.Data, Write.Hash);
+ }
+}
+
+} // namespace zen