aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-04-13 12:57:51 +0200
committerGitHub Enterprise <[email protected]>2026-04-13 12:57:51 +0200
commit87248f58b4551870af8f08aac3e38e8887e32073 (patch)
tree2e0d7d18b5397ff7ebec3b5a30668af47dfb5941 /src
parent5.8.4-pre1 (diff)
downloadzen-87248f58b4551870af8f08aac3e38e8887e32073.tar.xz
zen-87248f58b4551870af8f08aac3e38e8887e32073.zip
Add MemoryCidStore and ChunkStore interface (#940)
This PR introduces an in-memory `CidStore` option primarily for use with compute, to avoid hitting disk for ephemeral data which is not really worth persisting. And in particular not worth paying the critical path cost of persistence. - **MemoryCidStore**: In-memory CidStore implementation backed by a hash map, optionally layered over a standard CidStore. Writes to the backing store are dispatched asynchronously via a dedicated flush thread to avoid blocking callers on disk I/O. Reads check memory first, then fall back to the backing store without caching the result. - **ChunkStore interface**: Extract `ChunkStore` abstract class (`AddChunk`, `ContainsChunk`, `FilterChunks`) and `FallbackChunkResolver` into `zenstore.h` so `HttpComputeService` can accept different storage backends for action inputs vs worker binaries. `CidStore` and `MemoryCidStore` both implement `ChunkStore`. - **Compute service wiring**: `HttpComputeService` takes two `ChunkStore&` params (action + worker). The compute server uses `MemoryCidStore` for actions (no disk persistence needed) and disk-backed `CidStore` for workers (cross-action reuse). The storage server passes its `CidStore` for both (unchanged behavior).
Diffstat (limited to 'src')
-rw-r--r--src/zencompute/httpcomputeservice.cpp46
-rw-r--r--src/zencompute/include/zencompute/httpcomputeservice.h5
-rw-r--r--src/zenserver/compute/computeserver.cpp12
-rw-r--r--src/zenserver/compute/computeserver.h5
-rw-r--r--src/zenserver/storage/zenstorageserver.cpp2
-rw-r--r--src/zenstore/cidstore.cpp12
-rw-r--r--src/zenstore/include/zenstore/cidstore.h22
-rw-r--r--src/zenstore/include/zenstore/memorycidstore.h68
-rw-r--r--src/zenstore/include/zenstore/zenstore.h37
-rw-r--r--src/zenstore/memorycidstore.cpp143
-rw-r--r--src/zenstore/zenstore.cpp20
11 files changed, 331 insertions, 41 deletions
diff --git a/src/zencompute/httpcomputeservice.cpp b/src/zencompute/httpcomputeservice.cpp
index 1d28e7137..6cb975dd3 100644
--- a/src/zencompute/httpcomputeservice.cpp
+++ b/src/zencompute/httpcomputeservice.cpp
@@ -21,12 +21,14 @@
# include <zencore/thread.h>
# include <zencore/trace.h>
# include <zencore/uid.h>
-# include <zenstore/cidstore.h>
+# include <zenstore/hashkeyset.h>
+# include <zenstore/zenstore.h>
# include <zentelemetry/stats.h>
# include <algorithm>
# include <span>
# include <unordered_map>
+# include <utility>
# include <vector>
using namespace std::literals;
@@ -45,7 +47,9 @@ auto OidMatcher = [](std::string_view Str) { return Str.size() == 24 && AsciiSe
struct HttpComputeService::Impl
{
HttpComputeService* m_Self;
- CidStore& m_CidStore;
+ ChunkStore& m_ActionStore;
+ ChunkStore& m_WorkerStore;
+ FallbackChunkResolver m_CombinedResolver;
IHttpStatsService& m_StatsService;
LoggerRef m_Log;
std::filesystem::path m_BaseDir;
@@ -109,19 +113,22 @@ struct HttpComputeService::Impl
void RegisterRoutes();
- Impl(HttpComputeService* Self,
- CidStore& InCidStore,
- IHttpStatsService& StatsService,
- const std::filesystem::path& BaseDir,
- int32_t MaxConcurrentActions)
+ Impl(HttpComputeService* Self,
+ ChunkStore& InActionStore,
+ ChunkStore& InWorkerStore,
+ IHttpStatsService& StatsService,
+ std::filesystem::path BaseDir,
+ int32_t MaxConcurrentActions)
: m_Self(Self)
- , m_CidStore(InCidStore)
+ , m_ActionStore(InActionStore)
+ , m_WorkerStore(InWorkerStore)
+ , m_CombinedResolver(InActionStore, InWorkerStore)
, m_StatsService(StatsService)
, m_Log(logging::Get("compute"))
- , m_BaseDir(BaseDir)
- , m_ComputeService(InCidStore)
+ , m_BaseDir(std::move(BaseDir))
+ , m_ComputeService(m_CombinedResolver)
{
- m_ComputeService.AddLocalRunner(InCidStore, m_BaseDir / "local", MaxConcurrentActions);
+ m_ComputeService.AddLocalRunner(m_CombinedResolver, m_BaseDir / "local", MaxConcurrentActions);
m_ComputeService.WaitUntilReady();
m_StatsService.RegisterHandler("compute", *m_Self);
RegisterRoutes();
@@ -499,7 +506,7 @@ HttpComputeService::Impl::RegisterRoutes()
return HttpReq.WriteResponse(HttpResponseCode::Forbidden);
}
- m_ComputeService.StartRecording(m_CidStore, m_BaseDir / "recording");
+ m_ComputeService.StartRecording(m_CombinedResolver, m_BaseDir / "recording");
return HttpReq.WriteResponse(HttpResponseCode::OK);
},
@@ -1028,11 +1035,12 @@ HttpComputeService::Impl::RegisterRoutes()
//////////////////////////////////////////////////////////////////////////
-HttpComputeService::HttpComputeService(CidStore& InCidStore,
+HttpComputeService::HttpComputeService(ChunkStore& InActionStore,
+ ChunkStore& InWorkerStore,
IHttpStatsService& StatsService,
const std::filesystem::path& BaseDir,
int32_t MaxConcurrentActions)
-: m_Impl(std::make_unique<Impl>(this, InCidStore, StatsService, BaseDir, MaxConcurrentActions))
+: m_Impl(std::make_unique<Impl>(this, InActionStore, InWorkerStore, StatsService, BaseDir, MaxConcurrentActions))
{
}
@@ -1233,7 +1241,7 @@ HttpComputeService::Impl::IngestPackageAttachments(HttpServerRequest& HttpReq, c
OutStats.Bytes += CompressedSize;
++OutStats.Count;
- const CidStore::InsertResult InsertResult = m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash);
+ const ChunkStore::InsertResult InsertResult = m_ActionStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash);
if (InsertResult.New)
{
@@ -1251,7 +1259,7 @@ HttpComputeService::Impl::CheckAttachments(const CbObject& ActionObj, std::vecto
ActionObj.IterateAttachments([&](CbFieldView Field) {
const IoHash FileHash = Field.AsHash();
- if (!m_CidStore.ContainsChunk(FileHash))
+ if (!m_ActionStore.ContainsChunk(FileHash))
{
NeedList.push_back(FileHash);
}
@@ -1501,7 +1509,7 @@ HttpComputeService::Impl::HandleWorkerRequest(HttpServerRequest& HttpReq, const
CbPackage WorkerPackage;
WorkerPackage.SetObject(WorkerSpec);
- m_CidStore.FilterChunks(ChunkSet);
+ m_WorkerStore.FilterChunks(ChunkSet);
if (ChunkSet.IsEmpty())
{
@@ -1550,8 +1558,8 @@ HttpComputeService::Impl::HandleWorkerRequest(HttpServerRequest& HttpReq, const
TotalAttachmentBytes += Buffer.GetCompressedSize();
++AttachmentCount;
- const CidStore::InsertResult InsertResult =
- m_CidStore.AddChunk(Buffer.GetCompressed().Flatten().AsIoBuffer(), DataHash);
+ const ChunkStore::InsertResult InsertResult =
+ m_WorkerStore.AddChunk(Buffer.GetCompressed().Flatten().AsIoBuffer(), DataHash);
if (InsertResult.New)
{
diff --git a/src/zencompute/include/zencompute/httpcomputeservice.h b/src/zencompute/include/zencompute/httpcomputeservice.h
index de85a295f..db3fce3c2 100644
--- a/src/zencompute/include/zencompute/httpcomputeservice.h
+++ b/src/zencompute/include/zencompute/httpcomputeservice.h
@@ -15,7 +15,7 @@
# include <memory>
namespace zen {
-class CidStore;
+class ChunkStore;
}
namespace zen::compute {
@@ -26,7 +26,8 @@ namespace zen::compute {
class HttpComputeService : public HttpService, public IHttpStatsProvider, public IWebSocketHandler, public IComputeCompletionObserver
{
public:
- HttpComputeService(CidStore& InCidStore,
+ HttpComputeService(ChunkStore& InActionStore,
+ ChunkStore& InWorkerStore,
IHttpStatsService& StatsService,
const std::filesystem::path& BaseDir,
int32_t MaxConcurrentActions = 0);
diff --git a/src/zenserver/compute/computeserver.cpp b/src/zenserver/compute/computeserver.cpp
index 1673cea6c..7296098e0 100644
--- a/src/zenserver/compute/computeserver.cpp
+++ b/src/zenserver/compute/computeserver.cpp
@@ -444,11 +444,12 @@ ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig)
ZEN_TRACE_CPU("ZenComputeServer::InitializeServices");
ZEN_INFO("initializing compute services");
- CidStoreConfiguration Config;
- Config.RootDirectory = m_DataRoot / "cas";
+ m_ActionStore = std::make_unique<MemoryCidStore>();
- m_CidStore = std::make_unique<CidStore>(m_GcManager);
- m_CidStore->Initialize(Config);
+ CidStoreConfiguration WorkerStoreConfig;
+ WorkerStoreConfig.RootDirectory = m_DataRoot / "cas";
+ m_WorkerStore = std::make_unique<CidStore>(m_GcManager);
+ m_WorkerStore->Initialize(WorkerStoreConfig);
if (!ServerConfig.IdmsEndpoint.empty())
{
@@ -476,7 +477,8 @@ ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig)
std::make_unique<zen::compute::HttpOrchestratorService>(ServerConfig.DataDir / "orch", ServerConfig.EnableWorkerWebSocket);
ZEN_INFO("instantiating function service");
- m_ComputeService = std::make_unique<zen::compute::HttpComputeService>(*m_CidStore,
+ m_ComputeService = std::make_unique<zen::compute::HttpComputeService>(*m_ActionStore,
+ *m_WorkerStore,
m_StatsService,
ServerConfig.DataDir / "functions",
ServerConfig.MaxConcurrentActions);
diff --git a/src/zenserver/compute/computeserver.h b/src/zenserver/compute/computeserver.h
index 8f4edc0f0..38f93bc36 100644
--- a/src/zenserver/compute/computeserver.h
+++ b/src/zenserver/compute/computeserver.h
@@ -10,6 +10,7 @@
# include <zencore/system.h>
# include <zenhttp/httpwsclient.h>
# include <zenstore/gc.h>
+# include <zenstore/memorycidstore.h>
# include "frontend/frontend.h"
namespace cxxopts {
@@ -41,7 +42,6 @@ class NomadProvisioner;
namespace zen {
-class CidStore;
class HttpApiService;
struct ZenComputeServerConfig : public ZenServerConfig
@@ -131,7 +131,8 @@ public:
private:
GcManager m_GcManager;
GcScheduler m_GcScheduler{m_GcManager};
- std::unique_ptr<CidStore> m_CidStore;
+ std::unique_ptr<MemoryCidStore> m_ActionStore;
+ std::unique_ptr<CidStore> m_WorkerStore;
std::unique_ptr<HttpApiService> m_ApiService;
std::unique_ptr<zen::compute::HttpComputeService> m_ComputeService;
std::unique_ptr<zen::compute::HttpOrchestratorService> m_OrchestratorService;
diff --git a/src/zenserver/storage/zenstorageserver.cpp b/src/zenserver/storage/zenstorageserver.cpp
index 6b1da5f12..5cd759c11 100644
--- a/src/zenserver/storage/zenstorageserver.cpp
+++ b/src/zenserver/storage/zenstorageserver.cpp
@@ -325,7 +325,7 @@ ZenStorageServer::InitializeServices(const ZenStorageServerConfig& ServerOptions
ZEN_OTEL_SPAN("InitializeComputeService");
m_HttpComputeService =
- std::make_unique<compute::HttpComputeService>(*m_CidStore, m_StatsService, ServerOptions.DataDir / "functions");
+ std::make_unique<compute::HttpComputeService>(*m_CidStore, *m_CidStore, m_StatsService, ServerOptions.DataDir / "functions");
}
#endif
diff --git a/src/zenstore/cidstore.cpp b/src/zenstore/cidstore.cpp
index b20d8f565..ac8a75a58 100644
--- a/src/zenstore/cidstore.cpp
+++ b/src/zenstore/cidstore.cpp
@@ -188,12 +188,24 @@ CidStore::Initialize(const CidStoreConfiguration& Config)
}
CidStore::InsertResult
+CidStore::AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash)
+{
+ return m_Impl->AddChunk(ChunkData, RawHash, InsertMode::kMayBeMovedInPlace);
+}
+
+CidStore::InsertResult
CidStore::AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash, InsertMode Mode)
{
return m_Impl->AddChunk(ChunkData, RawHash, Mode);
}
std::vector<CidStore::InsertResult>
+CidStore::AddChunks(std::span<IoBuffer> ChunkDatas, std::span<IoHash> RawHashes)
+{
+ return m_Impl->AddChunks(ChunkDatas, RawHashes, InsertMode::kMayBeMovedInPlace);
+}
+
+std::vector<CidStore::InsertResult>
CidStore::AddChunks(std::span<IoBuffer> ChunkDatas, std::span<IoHash> RawHashes, InsertMode Mode)
{
return m_Impl->AddChunks(ChunkDatas, RawHashes, Mode);
diff --git a/src/zenstore/include/zenstore/cidstore.h b/src/zenstore/include/zenstore/cidstore.h
index d54062476..c00e0449f 100644
--- a/src/zenstore/include/zenstore/cidstore.h
+++ b/src/zenstore/include/zenstore/cidstore.h
@@ -58,16 +58,14 @@ struct CidStoreConfiguration
*
*/
-class CidStore final : public ChunkResolver, public StatsProvider
+class CidStore final : public ChunkStore, public StatsProvider
{
public:
CidStore(GcManager& Gc);
~CidStore();
- struct InsertResult
- {
- bool New = false;
- };
+ using InsertResult = ChunkStore::InsertResult;
+
enum class InsertMode
{
kCopyOnly,
@@ -75,17 +73,17 @@ public:
};
void Initialize(const CidStoreConfiguration& Config);
- InsertResult AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash, InsertMode Mode = InsertMode::kMayBeMovedInPlace);
- std::vector<InsertResult> AddChunks(std::span<IoBuffer> ChunkDatas,
- std::span<IoHash> RawHashes,
- InsertMode Mode = InsertMode::kMayBeMovedInPlace);
- virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) override;
+ InsertResult AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash) override;
+ InsertResult AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash, InsertMode Mode);
+ std::vector<InsertResult> AddChunks(std::span<IoBuffer> ChunkDatas, std::span<IoHash> RawHashes) override;
+ std::vector<InsertResult> AddChunks(std::span<IoBuffer> ChunkDatas, std::span<IoHash> RawHashes, InsertMode Mode);
+ IoBuffer FindChunkByCid(const IoHash& DecompressedId) override;
bool IterateChunks(std::span<IoHash> DecompressedIds,
const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
WorkerThreadPool* OptionalWorkerPool,
uint64_t LargeSizeLimit);
- bool ContainsChunk(const IoHash& DecompressedId);
- void FilterChunks(HashKeySet& InOutChunks);
+ bool ContainsChunk(const IoHash& DecompressedId) override;
+ void FilterChunks(HashKeySet& InOutChunks) override;
void Flush();
CidStoreSize TotalSize() const;
CidStoreStats Stats() const;
diff --git a/src/zenstore/include/zenstore/memorycidstore.h b/src/zenstore/include/zenstore/memorycidstore.h
new file mode 100644
index 000000000..0311274d5
--- /dev/null
+++ b/src/zenstore/include/zenstore/memorycidstore.h
@@ -0,0 +1,68 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "cidstore.h"
+#include "zenstore.h"
+
+#include <zencore/iobuffer.h>
+#include <zencore/iohash.h>
+#include <zencore/thread.h>
+
+#include <deque>
+#include <span>
+#include <thread>
+#include <unordered_map>
+
+namespace zen {
+
+class HashKeySet;
+
+/** Memory-backed chunk store.
+ *
+ * Stores chunks in an in-memory hash map, optionally layered over a
+ * standard CidStore for write-through and read fallback. When a backing
+ * store is provided:
+ *
+ * - AddChunk writes to memory and asynchronously to the backing store.
+ * - FindChunkByCid checks memory first, then falls back to the backing store.
+ * - ContainsChunk and FilterChunks check memory first, then the backing store.
+ *
+ * The memory store does NOT cache read-through results from the backing store.
+ * Only chunks explicitly added via AddChunk/AddChunks are held in memory.
+ */
+
+class MemoryCidStore : public ChunkStore
+{
+public:
+ explicit MemoryCidStore(CidStore* BackingStore = nullptr);
+ ~MemoryCidStore();
+
+ InsertResult AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash) override;
+ std::vector<InsertResult> AddChunks(std::span<IoBuffer> ChunkDatas, std::span<IoHash> RawHashes) override;
+ IoBuffer FindChunkByCid(const IoHash& DecompressedId) override;
+ bool ContainsChunk(const IoHash& DecompressedId) override;
+ void FilterChunks(HashKeySet& InOutChunks) override;
+
+private:
+ RwLock m_Lock;
+ std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> m_Chunks;
+ CidStore* m_BackingStore = nullptr;
+
+ // Async write-through to backing store
+ struct PendingWrite
+ {
+ IoBuffer Data;
+ IoHash Hash;
+ };
+
+ std::mutex m_FlushLock;
+ std::vector<PendingWrite> m_FlushQueue;
+ Event m_FlushEvent;
+ std::thread m_FlushThread;
+ std::atomic<bool> m_FlushThreadEnabled{false};
+
+ void FlushThreadFunction();
+};
+
+} // namespace zen
diff --git a/src/zenstore/include/zenstore/zenstore.h b/src/zenstore/include/zenstore/zenstore.h
index bed219b4b..95ae33a4a 100644
--- a/src/zenstore/include/zenstore/zenstore.h
+++ b/src/zenstore/include/zenstore/zenstore.h
@@ -4,19 +4,56 @@
#include <zencore/zencore.h>
+#include <span>
+#include <vector>
+
#define ZENSTORE_API
namespace zen {
+class HashKeySet;
class IoBuffer;
struct IoHash;
class ChunkResolver
{
public:
+ virtual ~ChunkResolver() = default;
virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) = 0;
};
+/** Abstract chunk store interface.
+ *
+ * Extends ChunkResolver with write and query operations. Both CidStore
+ * (disk-backed) and MemoryCidStore (in-memory) implement this interface,
+ * allowing callers to be agnostic about the storage backend.
+ */
+class ChunkStore : public ChunkResolver
+{
+public:
+ struct InsertResult
+ {
+ bool New = false;
+ };
+
+ virtual InsertResult AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash) = 0;
+ virtual std::vector<InsertResult> AddChunks(std::span<IoBuffer> ChunkDatas, std::span<IoHash> RawHashes) = 0;
+ virtual bool ContainsChunk(const IoHash& DecompressedId) = 0;
+ virtual void FilterChunks(HashKeySet& InOutChunks) = 0;
+};
+
+/** Composite resolver that tries a primary store first, then a fallback. */
+class FallbackChunkResolver : public ChunkResolver
+{
+public:
+ FallbackChunkResolver(ChunkResolver& Primary, ChunkResolver& Fallback);
+ IoBuffer FindChunkByCid(const IoHash& DecompressedId) override;
+
+private:
+ ChunkResolver& m_Primary;
+ ChunkResolver& m_Fallback;
+};
+
ZENSTORE_API void zenstore_forcelinktests();
} // namespace zen
diff --git a/src/zenstore/memorycidstore.cpp b/src/zenstore/memorycidstore.cpp
new file mode 100644
index 000000000..b4832029b
--- /dev/null
+++ b/src/zenstore/memorycidstore.cpp
@@ -0,0 +1,143 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenstore/hashkeyset.h>
+#include <zenstore/memorycidstore.h>
+
+namespace zen {
+
+MemoryCidStore::MemoryCidStore(CidStore* BackingStore) : m_BackingStore(BackingStore)
+{
+ if (m_BackingStore)
+ {
+ m_FlushThreadEnabled = true;
+ m_FlushThread = std::thread(&MemoryCidStore::FlushThreadFunction, this);
+ }
+}
+
+MemoryCidStore::~MemoryCidStore()
+{
+ m_FlushThreadEnabled = false;
+ m_FlushEvent.Set();
+ if (m_FlushThread.joinable())
+ {
+ m_FlushThread.join();
+ }
+}
+
+MemoryCidStore::InsertResult
+MemoryCidStore::AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash)
+{
+ bool IsNew = false;
+
+ m_Lock.WithExclusiveLock([&] {
+ auto [It, Inserted] = m_Chunks.try_emplace(RawHash, ChunkData);
+ IsNew = Inserted;
+ });
+
+ if (m_BackingStore)
+ {
+ std::lock_guard<std::mutex> Lock(m_FlushLock);
+ m_FlushQueue.push_back({.Data = ChunkData, .Hash = RawHash});
+ m_FlushEvent.Set();
+ }
+
+ return {.New = IsNew};
+}
+
+std::vector<MemoryCidStore::InsertResult>
+MemoryCidStore::AddChunks(std::span<IoBuffer> ChunkDatas, std::span<IoHash> RawHashes)
+{
+ std::vector<MemoryCidStore::InsertResult> Results;
+ Results.reserve(ChunkDatas.size());
+
+ for (size_t i = 0; i < ChunkDatas.size(); ++i)
+ {
+ Results.push_back(AddChunk(ChunkDatas[i], RawHashes[i]));
+ }
+
+ return Results;
+}
+
+IoBuffer
+MemoryCidStore::FindChunkByCid(const IoHash& DecompressedId)
+{
+ IoBuffer Result;
+
+ m_Lock.WithSharedLock([&] {
+ auto It = m_Chunks.find(DecompressedId);
+ if (It != m_Chunks.end())
+ {
+ Result = It->second;
+ }
+ });
+
+ if (!Result && m_BackingStore)
+ {
+ Result = m_BackingStore->FindChunkByCid(DecompressedId);
+ }
+
+ return Result;
+}
+
+bool
+MemoryCidStore::ContainsChunk(const IoHash& DecompressedId)
+{
+ bool Found = false;
+
+ m_Lock.WithSharedLock([&] { Found = m_Chunks.find(DecompressedId) != m_Chunks.end(); });
+
+ if (!Found && m_BackingStore)
+ {
+ Found = m_BackingStore->ContainsChunk(DecompressedId);
+ }
+
+ return Found;
+}
+
+void
+MemoryCidStore::FilterChunks(HashKeySet& InOutChunks)
+{
+ // Remove hashes that are present in our memory store
+ m_Lock.WithSharedLock([&] { InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return m_Chunks.find(Hash) != m_Chunks.end(); }); });
+
+ // Delegate remainder to backing store
+ if (m_BackingStore && !InOutChunks.IsEmpty())
+ {
+ m_BackingStore->FilterChunks(InOutChunks);
+ }
+}
+
+void
+MemoryCidStore::FlushThreadFunction()
+{
+ SetCurrentThreadName("MemCidFlush");
+
+ while (m_FlushThreadEnabled)
+ {
+ m_FlushEvent.Wait();
+
+ std::vector<PendingWrite> Batch;
+ {
+ std::lock_guard<std::mutex> Lock(m_FlushLock);
+ Batch.swap(m_FlushQueue);
+ }
+
+ for (PendingWrite& Write : Batch)
+ {
+ m_BackingStore->AddChunk(Write.Data, Write.Hash);
+ }
+ }
+
+ // Drain remaining writes on shutdown
+ std::vector<PendingWrite> Remaining;
+ {
+ std::lock_guard<std::mutex> Lock(m_FlushLock);
+ Remaining.swap(m_FlushQueue);
+ }
+ for (PendingWrite& Write : Remaining)
+ {
+ m_BackingStore->AddChunk(Write.Data, Write.Hash);
+ }
+}
+
+} // namespace zen
diff --git a/src/zenstore/zenstore.cpp b/src/zenstore/zenstore.cpp
index c563cc202..bf0c71211 100644
--- a/src/zenstore/zenstore.cpp
+++ b/src/zenstore/zenstore.cpp
@@ -2,6 +2,26 @@
#include "zenstore/zenstore.h"
+#include <zencore/iobuffer.h>
+
+namespace zen {
+
+FallbackChunkResolver::FallbackChunkResolver(ChunkResolver& Primary, ChunkResolver& Fallback) : m_Primary(Primary), m_Fallback(Fallback)
+{
+}
+
+IoBuffer
+FallbackChunkResolver::FindChunkByCid(const IoHash& DecompressedId)
+{
+ if (IoBuffer Result = m_Primary.FindChunkByCid(DecompressedId))
+ {
+ return Result;
+ }
+ return m_Fallback.FindChunkByCid(DecompressedId);
+}
+
+} // namespace zen
+
#if ZEN_WITH_TESTS
# include <zenstore/blockstore.h>