From 87248f58b4551870af8f08aac3e38e8887e32073 Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Mon, 13 Apr 2026 12:57:51 +0200 Subject: Add MemoryCidStore and ChunkStore interface (#940) This PR introduces an in-memory `CidStore` option primarily for use with compute, to avoid hitting disk for ephemeral data which is not really worth persisting. And in particular not worth paying the critical path cost of persistence. - **MemoryCidStore**: In-memory CidStore implementation backed by a hash map, optionally layered over a standard CidStore. Writes to the backing store are dispatched asynchronously via a dedicated flush thread to avoid blocking callers on disk I/O. Reads check memory first, then fall back to the backing store without caching the result. - **ChunkStore interface**: Extract `ChunkStore` abstract class (`AddChunk`, `ContainsChunk`, `FilterChunks`) and `FallbackChunkResolver` into `zenstore.h` so `HttpComputeService` can accept different storage backends for action inputs vs worker binaries. `CidStore` and `MemoryCidStore` both implement `ChunkStore`. - **Compute service wiring**: `HttpComputeService` takes two `ChunkStore&` params (action + worker). The compute server uses `MemoryCidStore` for actions (no disk persistence needed) and disk-backed `CidStore` for workers (cross-action reuse). The storage server passes its `CidStore` for both (unchanged behavior). --- src/zenstore/memorycidstore.cpp | 143 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 143 insertions(+) create mode 100644 src/zenstore/memorycidstore.cpp (limited to 'src/zenstore/memorycidstore.cpp') diff --git a/src/zenstore/memorycidstore.cpp b/src/zenstore/memorycidstore.cpp new file mode 100644 index 000000000..b4832029b --- /dev/null +++ b/src/zenstore/memorycidstore.cpp @@ -0,0 +1,143 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include +#include + +namespace zen { + +MemoryCidStore::MemoryCidStore(CidStore* BackingStore) : m_BackingStore(BackingStore) +{ + if (m_BackingStore) + { + m_FlushThreadEnabled = true; + m_FlushThread = std::thread(&MemoryCidStore::FlushThreadFunction, this); + } +} + +MemoryCidStore::~MemoryCidStore() +{ + m_FlushThreadEnabled = false; + m_FlushEvent.Set(); + if (m_FlushThread.joinable()) + { + m_FlushThread.join(); + } +} + +MemoryCidStore::InsertResult +MemoryCidStore::AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash) +{ + bool IsNew = false; + + m_Lock.WithExclusiveLock([&] { + auto [It, Inserted] = m_Chunks.try_emplace(RawHash, ChunkData); + IsNew = Inserted; + }); + + if (m_BackingStore) + { + std::lock_guard Lock(m_FlushLock); + m_FlushQueue.push_back({.Data = ChunkData, .Hash = RawHash}); + m_FlushEvent.Set(); + } + + return {.New = IsNew}; +} + +std::vector +MemoryCidStore::AddChunks(std::span ChunkDatas, std::span RawHashes) +{ + std::vector Results; + Results.reserve(ChunkDatas.size()); + + for (size_t i = 0; i < ChunkDatas.size(); ++i) + { + Results.push_back(AddChunk(ChunkDatas[i], RawHashes[i])); + } + + return Results; +} + +IoBuffer +MemoryCidStore::FindChunkByCid(const IoHash& DecompressedId) +{ + IoBuffer Result; + + m_Lock.WithSharedLock([&] { + auto It = m_Chunks.find(DecompressedId); + if (It != m_Chunks.end()) + { + Result = It->second; + } + }); + + if (!Result && m_BackingStore) + { + Result = m_BackingStore->FindChunkByCid(DecompressedId); + } + + return Result; +} + +bool +MemoryCidStore::ContainsChunk(const IoHash& DecompressedId) +{ + bool Found = false; + + m_Lock.WithSharedLock([&] { Found = m_Chunks.find(DecompressedId) != m_Chunks.end(); }); + + if (!Found && m_BackingStore) + { + Found = m_BackingStore->ContainsChunk(DecompressedId); + } + + return Found; +} + +void +MemoryCidStore::FilterChunks(HashKeySet& InOutChunks) +{ + // Remove hashes that are present in our memory store + m_Lock.WithSharedLock([&] { InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return m_Chunks.find(Hash) != m_Chunks.end(); }); }); + + // Delegate remainder to backing store + if (m_BackingStore && !InOutChunks.IsEmpty()) + { + m_BackingStore->FilterChunks(InOutChunks); + } +} + +void +MemoryCidStore::FlushThreadFunction() +{ + SetCurrentThreadName("MemCidFlush"); + + while (m_FlushThreadEnabled) + { + m_FlushEvent.Wait(); + + std::vector Batch; + { + std::lock_guard Lock(m_FlushLock); + Batch.swap(m_FlushQueue); + } + + for (PendingWrite& Write : Batch) + { + m_BackingStore->AddChunk(Write.Data, Write.Hash); + } + } + + // Drain remaining writes on shutdown + std::vector Remaining; + { + std::lock_guard Lock(m_FlushLock); + Remaining.swap(m_FlushQueue); + } + for (PendingWrite& Write : Remaining) + { + m_BackingStore->AddChunk(Write.Data, Write.Hash); + } +} + +} // namespace zen -- cgit v1.2.3