aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-04-13 12:57:51 +0200
committerGitHub Enterprise <[email protected]>2026-04-13 12:57:51 +0200
commit87248f58b4551870af8f08aac3e38e8887e32073 (patch)
tree2e0d7d18b5397ff7ebec3b5a30668af47dfb5941 /src/zencompute
parent5.8.4-pre1 (diff)
downloadzen-87248f58b4551870af8f08aac3e38e8887e32073.tar.xz
zen-87248f58b4551870af8f08aac3e38e8887e32073.zip
Add MemoryCidStore and ChunkStore interface (#940)
This PR introduces an in-memory `CidStore` option primarily for use with compute, to avoid hitting disk for ephemeral data which is not really worth persisting. And in particular not worth paying the critical path cost of persistence. - **MemoryCidStore**: In-memory CidStore implementation backed by a hash map, optionally layered over a standard CidStore. Writes to the backing store are dispatched asynchronously via a dedicated flush thread to avoid blocking callers on disk I/O. Reads check memory first, then fall back to the backing store without caching the result. - **ChunkStore interface**: Extract `ChunkStore` abstract class (`AddChunk`, `ContainsChunk`, `FilterChunks`) and `FallbackChunkResolver` into `zenstore.h` so `HttpComputeService` can accept different storage backends for action inputs vs worker binaries. `CidStore` and `MemoryCidStore` both implement `ChunkStore`. - **Compute service wiring**: `HttpComputeService` takes two `ChunkStore&` params (action + worker). The compute server uses `MemoryCidStore` for actions (no disk persistence needed) and disk-backed `CidStore` for workers (cross-action reuse). The storage server passes its `CidStore` for both (unchanged behavior).
Diffstat (limited to 'src/zencompute')
-rw-r--r--src/zencompute/httpcomputeservice.cpp46
-rw-r--r--src/zencompute/include/zencompute/httpcomputeservice.h5
2 files changed, 30 insertions, 21 deletions
diff --git a/src/zencompute/httpcomputeservice.cpp b/src/zencompute/httpcomputeservice.cpp
index 1d28e7137..6cb975dd3 100644
--- a/src/zencompute/httpcomputeservice.cpp
+++ b/src/zencompute/httpcomputeservice.cpp
@@ -21,12 +21,14 @@
# include <zencore/thread.h>
# include <zencore/trace.h>
# include <zencore/uid.h>
-# include <zenstore/cidstore.h>
+# include <zenstore/hashkeyset.h>
+# include <zenstore/zenstore.h>
# include <zentelemetry/stats.h>
# include <algorithm>
# include <span>
# include <unordered_map>
+# include <utility>
# include <vector>
using namespace std::literals;
@@ -45,7 +47,9 @@ auto OidMatcher = [](std::string_view Str) { return Str.size() == 24 && AsciiSe
struct HttpComputeService::Impl
{
HttpComputeService* m_Self;
- CidStore& m_CidStore;
+ ChunkStore& m_ActionStore;
+ ChunkStore& m_WorkerStore;
+ FallbackChunkResolver m_CombinedResolver;
IHttpStatsService& m_StatsService;
LoggerRef m_Log;
std::filesystem::path m_BaseDir;
@@ -109,19 +113,22 @@ struct HttpComputeService::Impl
void RegisterRoutes();
- Impl(HttpComputeService* Self,
- CidStore& InCidStore,
- IHttpStatsService& StatsService,
- const std::filesystem::path& BaseDir,
- int32_t MaxConcurrentActions)
+ Impl(HttpComputeService* Self,
+ ChunkStore& InActionStore,
+ ChunkStore& InWorkerStore,
+ IHttpStatsService& StatsService,
+ std::filesystem::path BaseDir,
+ int32_t MaxConcurrentActions)
: m_Self(Self)
- , m_CidStore(InCidStore)
+ , m_ActionStore(InActionStore)
+ , m_WorkerStore(InWorkerStore)
+ , m_CombinedResolver(InActionStore, InWorkerStore)
, m_StatsService(StatsService)
, m_Log(logging::Get("compute"))
- , m_BaseDir(BaseDir)
- , m_ComputeService(InCidStore)
+ , m_BaseDir(std::move(BaseDir))
+ , m_ComputeService(m_CombinedResolver)
{
- m_ComputeService.AddLocalRunner(InCidStore, m_BaseDir / "local", MaxConcurrentActions);
+ m_ComputeService.AddLocalRunner(m_CombinedResolver, m_BaseDir / "local", MaxConcurrentActions);
m_ComputeService.WaitUntilReady();
m_StatsService.RegisterHandler("compute", *m_Self);
RegisterRoutes();
@@ -499,7 +506,7 @@ HttpComputeService::Impl::RegisterRoutes()
return HttpReq.WriteResponse(HttpResponseCode::Forbidden);
}
- m_ComputeService.StartRecording(m_CidStore, m_BaseDir / "recording");
+ m_ComputeService.StartRecording(m_CombinedResolver, m_BaseDir / "recording");
return HttpReq.WriteResponse(HttpResponseCode::OK);
},
@@ -1028,11 +1035,12 @@ HttpComputeService::Impl::RegisterRoutes()
//////////////////////////////////////////////////////////////////////////
-HttpComputeService::HttpComputeService(CidStore& InCidStore,
+HttpComputeService::HttpComputeService(ChunkStore& InActionStore,
+ ChunkStore& InWorkerStore,
IHttpStatsService& StatsService,
const std::filesystem::path& BaseDir,
int32_t MaxConcurrentActions)
-: m_Impl(std::make_unique<Impl>(this, InCidStore, StatsService, BaseDir, MaxConcurrentActions))
+: m_Impl(std::make_unique<Impl>(this, InActionStore, InWorkerStore, StatsService, BaseDir, MaxConcurrentActions))
{
}
@@ -1233,7 +1241,7 @@ HttpComputeService::Impl::IngestPackageAttachments(HttpServerRequest& HttpReq, c
OutStats.Bytes += CompressedSize;
++OutStats.Count;
- const CidStore::InsertResult InsertResult = m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash);
+ const ChunkStore::InsertResult InsertResult = m_ActionStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash);
if (InsertResult.New)
{
@@ -1251,7 +1259,7 @@ HttpComputeService::Impl::CheckAttachments(const CbObject& ActionObj, std::vecto
ActionObj.IterateAttachments([&](CbFieldView Field) {
const IoHash FileHash = Field.AsHash();
- if (!m_CidStore.ContainsChunk(FileHash))
+ if (!m_ActionStore.ContainsChunk(FileHash))
{
NeedList.push_back(FileHash);
}
@@ -1501,7 +1509,7 @@ HttpComputeService::Impl::HandleWorkerRequest(HttpServerRequest& HttpReq, const
CbPackage WorkerPackage;
WorkerPackage.SetObject(WorkerSpec);
- m_CidStore.FilterChunks(ChunkSet);
+ m_WorkerStore.FilterChunks(ChunkSet);
if (ChunkSet.IsEmpty())
{
@@ -1550,8 +1558,8 @@ HttpComputeService::Impl::HandleWorkerRequest(HttpServerRequest& HttpReq, const
TotalAttachmentBytes += Buffer.GetCompressedSize();
++AttachmentCount;
- const CidStore::InsertResult InsertResult =
- m_CidStore.AddChunk(Buffer.GetCompressed().Flatten().AsIoBuffer(), DataHash);
+ const ChunkStore::InsertResult InsertResult =
+ m_WorkerStore.AddChunk(Buffer.GetCompressed().Flatten().AsIoBuffer(), DataHash);
if (InsertResult.New)
{
diff --git a/src/zencompute/include/zencompute/httpcomputeservice.h b/src/zencompute/include/zencompute/httpcomputeservice.h
index de85a295f..db3fce3c2 100644
--- a/src/zencompute/include/zencompute/httpcomputeservice.h
+++ b/src/zencompute/include/zencompute/httpcomputeservice.h
@@ -15,7 +15,7 @@
# include <memory>
namespace zen {
-class CidStore;
+class ChunkStore;
}
namespace zen::compute {
@@ -26,7 +26,8 @@ namespace zen::compute {
class HttpComputeService : public HttpService, public IHttpStatsProvider, public IWebSocketHandler, public IComputeCompletionObserver
{
public:
- HttpComputeService(CidStore& InCidStore,
+ HttpComputeService(ChunkStore& InActionStore,
+ ChunkStore& InWorkerStore,
IHttpStatsService& StatsService,
const std::filesystem::path& BaseDir,
int32_t MaxConcurrentActions = 0);