diff options
| author | Stefan Boberg <[email protected]> | 2026-04-13 12:57:51 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-04-13 12:57:51 +0200 |
| commit | 87248f58b4551870af8f08aac3e38e8887e32073 (patch) | |
| tree | 2e0d7d18b5397ff7ebec3b5a30668af47dfb5941 /src/zenstore/include | |
| parent | 5.8.4-pre1 (diff) | |
| download | zen-87248f58b4551870af8f08aac3e38e8887e32073.tar.xz zen-87248f58b4551870af8f08aac3e38e8887e32073.zip | |
Add MemoryCidStore and ChunkStore interface (#940)
This PR introduces an in-memory `CidStore` option primarily for use with compute, to avoid hitting disk for ephemeral data which is not really worth persisting. And in particular not worth paying the critical path cost of persistence.
- **MemoryCidStore**: In-memory CidStore implementation backed by a hash map, optionally layered over a standard CidStore. Writes to the backing store are dispatched asynchronously via a dedicated flush thread to avoid blocking callers on disk I/O. Reads check memory first, then fall back to the backing store without caching the result.
- **ChunkStore interface**: Extract `ChunkStore` abstract class (`AddChunk`, `ContainsChunk`, `FilterChunks`) and `FallbackChunkResolver` into `zenstore.h` so `HttpComputeService` can accept different storage backends for action inputs vs worker binaries. `CidStore` and `MemoryCidStore` both implement `ChunkStore`.
- **Compute service wiring**: `HttpComputeService` takes two `ChunkStore&` params (action + worker). The compute server uses `MemoryCidStore` for actions (no disk persistence needed) and disk-backed `CidStore` for workers (cross-action reuse). The storage server passes its `CidStore` for both (unchanged behavior).
Diffstat (limited to 'src/zenstore/include')
| -rw-r--r-- | src/zenstore/include/zenstore/cidstore.h | 22 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/memorycidstore.h | 68 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/zenstore.h | 37 |
3 files changed, 115 insertions, 12 deletions
diff --git a/src/zenstore/include/zenstore/cidstore.h b/src/zenstore/include/zenstore/cidstore.h index d54062476..c00e0449f 100644 --- a/src/zenstore/include/zenstore/cidstore.h +++ b/src/zenstore/include/zenstore/cidstore.h @@ -58,16 +58,14 @@ struct CidStoreConfiguration * */ -class CidStore final : public ChunkResolver, public StatsProvider +class CidStore final : public ChunkStore, public StatsProvider { public: CidStore(GcManager& Gc); ~CidStore(); - struct InsertResult - { - bool New = false; - }; + using InsertResult = ChunkStore::InsertResult; + enum class InsertMode { kCopyOnly, @@ -75,17 +73,17 @@ public: }; void Initialize(const CidStoreConfiguration& Config); - InsertResult AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash, InsertMode Mode = InsertMode::kMayBeMovedInPlace); - std::vector<InsertResult> AddChunks(std::span<IoBuffer> ChunkDatas, - std::span<IoHash> RawHashes, - InsertMode Mode = InsertMode::kMayBeMovedInPlace); - virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) override; + InsertResult AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash) override; + InsertResult AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash, InsertMode Mode); + std::vector<InsertResult> AddChunks(std::span<IoBuffer> ChunkDatas, std::span<IoHash> RawHashes) override; + std::vector<InsertResult> AddChunks(std::span<IoBuffer> ChunkDatas, std::span<IoHash> RawHashes, InsertMode Mode); + IoBuffer FindChunkByCid(const IoHash& DecompressedId) override; bool IterateChunks(std::span<IoHash> DecompressedIds, const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, WorkerThreadPool* OptionalWorkerPool, uint64_t LargeSizeLimit); - bool ContainsChunk(const IoHash& DecompressedId); - void FilterChunks(HashKeySet& InOutChunks); + bool ContainsChunk(const IoHash& DecompressedId) override; + void FilterChunks(HashKeySet& InOutChunks) override; void Flush(); CidStoreSize TotalSize() const; CidStoreStats Stats() const; diff --git a/src/zenstore/include/zenstore/memorycidstore.h b/src/zenstore/include/zenstore/memorycidstore.h new file mode 100644 index 000000000..0311274d5 --- /dev/null +++ b/src/zenstore/include/zenstore/memorycidstore.h @@ -0,0 +1,68 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "cidstore.h" +#include "zenstore.h" + +#include <zencore/iobuffer.h> +#include <zencore/iohash.h> +#include <zencore/thread.h> + +#include <deque> +#include <span> +#include <thread> +#include <unordered_map> + +namespace zen { + +class HashKeySet; + +/** Memory-backed chunk store. + * + * Stores chunks in an in-memory hash map, optionally layered over a + * standard CidStore for write-through and read fallback. When a backing + * store is provided: + * + * - AddChunk writes to memory and asynchronously to the backing store. + * - FindChunkByCid checks memory first, then falls back to the backing store. + * - ContainsChunk and FilterChunks check memory first, then the backing store. + * + * The memory store does NOT cache read-through results from the backing store. + * Only chunks explicitly added via AddChunk/AddChunks are held in memory. + */ + +class MemoryCidStore : public ChunkStore +{ +public: + explicit MemoryCidStore(CidStore* BackingStore = nullptr); + ~MemoryCidStore(); + + InsertResult AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash) override; + std::vector<InsertResult> AddChunks(std::span<IoBuffer> ChunkDatas, std::span<IoHash> RawHashes) override; + IoBuffer FindChunkByCid(const IoHash& DecompressedId) override; + bool ContainsChunk(const IoHash& DecompressedId) override; + void FilterChunks(HashKeySet& InOutChunks) override; + +private: + RwLock m_Lock; + std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> m_Chunks; + CidStore* m_BackingStore = nullptr; + + // Async write-through to backing store + struct PendingWrite + { + IoBuffer Data; + IoHash Hash; + }; + + std::mutex m_FlushLock; + std::vector<PendingWrite> m_FlushQueue; + Event m_FlushEvent; + std::thread m_FlushThread; + std::atomic<bool> m_FlushThreadEnabled{false}; + + void FlushThreadFunction(); +}; + +} // namespace zen diff --git a/src/zenstore/include/zenstore/zenstore.h b/src/zenstore/include/zenstore/zenstore.h index bed219b4b..95ae33a4a 100644 --- a/src/zenstore/include/zenstore/zenstore.h +++ b/src/zenstore/include/zenstore/zenstore.h @@ -4,19 +4,56 @@ #include <zencore/zencore.h> +#include <span> +#include <vector> + #define ZENSTORE_API namespace zen { +class HashKeySet; class IoBuffer; struct IoHash; class ChunkResolver { public: + virtual ~ChunkResolver() = default; virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) = 0; }; +/** Abstract chunk store interface. + * + * Extends ChunkResolver with write and query operations. Both CidStore + * (disk-backed) and MemoryCidStore (in-memory) implement this interface, + * allowing callers to be agnostic about the storage backend. + */ +class ChunkStore : public ChunkResolver +{ +public: + struct InsertResult + { + bool New = false; + }; + + virtual InsertResult AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash) = 0; + virtual std::vector<InsertResult> AddChunks(std::span<IoBuffer> ChunkDatas, std::span<IoHash> RawHashes) = 0; + virtual bool ContainsChunk(const IoHash& DecompressedId) = 0; + virtual void FilterChunks(HashKeySet& InOutChunks) = 0; +}; + +/** Composite resolver that tries a primary store first, then a fallback. */ +class FallbackChunkResolver : public ChunkResolver +{ +public: + FallbackChunkResolver(ChunkResolver& Primary, ChunkResolver& Fallback); + IoBuffer FindChunkByCid(const IoHash& DecompressedId) override; + +private: + ChunkResolver& m_Primary; + ChunkResolver& m_Fallback; +}; + ZENSTORE_API void zenstore_forcelinktests(); } // namespace zen |