diff options
| -rw-r--r-- | src/zencompute/httpcomputeservice.cpp | 46 | ||||
| -rw-r--r-- | src/zencompute/include/zencompute/httpcomputeservice.h | 5 | ||||
| -rw-r--r-- | src/zenserver/compute/computeserver.cpp | 12 | ||||
| -rw-r--r-- | src/zenserver/compute/computeserver.h | 5 | ||||
| -rw-r--r-- | src/zenserver/storage/zenstorageserver.cpp | 2 | ||||
| -rw-r--r-- | src/zenstore/cidstore.cpp | 12 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cidstore.h | 22 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/memorycidstore.h | 68 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/zenstore.h | 37 | ||||
| -rw-r--r-- | src/zenstore/memorycidstore.cpp | 143 | ||||
| -rw-r--r-- | src/zenstore/zenstore.cpp | 20 |
11 files changed, 331 insertions, 41 deletions
diff --git a/src/zencompute/httpcomputeservice.cpp b/src/zencompute/httpcomputeservice.cpp index 1d28e7137..6cb975dd3 100644 --- a/src/zencompute/httpcomputeservice.cpp +++ b/src/zencompute/httpcomputeservice.cpp @@ -21,12 +21,14 @@ # include <zencore/thread.h> # include <zencore/trace.h> # include <zencore/uid.h> -# include <zenstore/cidstore.h> +# include <zenstore/hashkeyset.h> +# include <zenstore/zenstore.h> # include <zentelemetry/stats.h> # include <algorithm> # include <span> # include <unordered_map> +# include <utility> # include <vector> using namespace std::literals; @@ -45,7 +47,9 @@ auto OidMatcher = [](std::string_view Str) { return Str.size() == 24 && AsciiSe struct HttpComputeService::Impl { HttpComputeService* m_Self; - CidStore& m_CidStore; + ChunkStore& m_ActionStore; + ChunkStore& m_WorkerStore; + FallbackChunkResolver m_CombinedResolver; IHttpStatsService& m_StatsService; LoggerRef m_Log; std::filesystem::path m_BaseDir; @@ -109,19 +113,22 @@ struct HttpComputeService::Impl void RegisterRoutes(); - Impl(HttpComputeService* Self, - CidStore& InCidStore, - IHttpStatsService& StatsService, - const std::filesystem::path& BaseDir, - int32_t MaxConcurrentActions) + Impl(HttpComputeService* Self, + ChunkStore& InActionStore, + ChunkStore& InWorkerStore, + IHttpStatsService& StatsService, + std::filesystem::path BaseDir, + int32_t MaxConcurrentActions) : m_Self(Self) - , m_CidStore(InCidStore) + , m_ActionStore(InActionStore) + , m_WorkerStore(InWorkerStore) + , m_CombinedResolver(InActionStore, InWorkerStore) , m_StatsService(StatsService) , m_Log(logging::Get("compute")) - , m_BaseDir(BaseDir) - , m_ComputeService(InCidStore) + , m_BaseDir(std::move(BaseDir)) + , m_ComputeService(m_CombinedResolver) { - m_ComputeService.AddLocalRunner(InCidStore, m_BaseDir / "local", MaxConcurrentActions); + m_ComputeService.AddLocalRunner(m_CombinedResolver, m_BaseDir / "local", MaxConcurrentActions); m_ComputeService.WaitUntilReady(); m_StatsService.RegisterHandler("compute", *m_Self); RegisterRoutes(); @@ -499,7 +506,7 @@ HttpComputeService::Impl::RegisterRoutes() return HttpReq.WriteResponse(HttpResponseCode::Forbidden); } - m_ComputeService.StartRecording(m_CidStore, m_BaseDir / "recording"); + m_ComputeService.StartRecording(m_CombinedResolver, m_BaseDir / "recording"); return HttpReq.WriteResponse(HttpResponseCode::OK); }, @@ -1028,11 +1035,12 @@ HttpComputeService::Impl::RegisterRoutes() ////////////////////////////////////////////////////////////////////////// -HttpComputeService::HttpComputeService(CidStore& InCidStore, +HttpComputeService::HttpComputeService(ChunkStore& InActionStore, + ChunkStore& InWorkerStore, IHttpStatsService& StatsService, const std::filesystem::path& BaseDir, int32_t MaxConcurrentActions) -: m_Impl(std::make_unique<Impl>(this, InCidStore, StatsService, BaseDir, MaxConcurrentActions)) +: m_Impl(std::make_unique<Impl>(this, InActionStore, InWorkerStore, StatsService, BaseDir, MaxConcurrentActions)) { } @@ -1233,7 +1241,7 @@ HttpComputeService::Impl::IngestPackageAttachments(HttpServerRequest& HttpReq, c OutStats.Bytes += CompressedSize; ++OutStats.Count; - const CidStore::InsertResult InsertResult = m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash); + const ChunkStore::InsertResult InsertResult = m_ActionStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash); if (InsertResult.New) { @@ -1251,7 +1259,7 @@ HttpComputeService::Impl::CheckAttachments(const CbObject& ActionObj, std::vecto ActionObj.IterateAttachments([&](CbFieldView Field) { const IoHash FileHash = Field.AsHash(); - if (!m_CidStore.ContainsChunk(FileHash)) + if (!m_ActionStore.ContainsChunk(FileHash)) { NeedList.push_back(FileHash); } @@ -1501,7 +1509,7 @@ HttpComputeService::Impl::HandleWorkerRequest(HttpServerRequest& HttpReq, const CbPackage WorkerPackage; WorkerPackage.SetObject(WorkerSpec); - m_CidStore.FilterChunks(ChunkSet); + m_WorkerStore.FilterChunks(ChunkSet); if (ChunkSet.IsEmpty()) { @@ -1550,8 +1558,8 @@ HttpComputeService::Impl::HandleWorkerRequest(HttpServerRequest& HttpReq, const TotalAttachmentBytes += Buffer.GetCompressedSize(); ++AttachmentCount; - const CidStore::InsertResult InsertResult = - m_CidStore.AddChunk(Buffer.GetCompressed().Flatten().AsIoBuffer(), DataHash); + const ChunkStore::InsertResult InsertResult = + m_WorkerStore.AddChunk(Buffer.GetCompressed().Flatten().AsIoBuffer(), DataHash); if (InsertResult.New) { diff --git a/src/zencompute/include/zencompute/httpcomputeservice.h b/src/zencompute/include/zencompute/httpcomputeservice.h index de85a295f..db3fce3c2 100644 --- a/src/zencompute/include/zencompute/httpcomputeservice.h +++ b/src/zencompute/include/zencompute/httpcomputeservice.h @@ -15,7 +15,7 @@ # include <memory> namespace zen { -class CidStore; +class ChunkStore; } namespace zen::compute { @@ -26,7 +26,8 @@ namespace zen::compute { class HttpComputeService : public HttpService, public IHttpStatsProvider, public IWebSocketHandler, public IComputeCompletionObserver { public: - HttpComputeService(CidStore& InCidStore, + HttpComputeService(ChunkStore& InActionStore, + ChunkStore& InWorkerStore, IHttpStatsService& StatsService, const std::filesystem::path& BaseDir, int32_t MaxConcurrentActions = 0); diff --git a/src/zenserver/compute/computeserver.cpp b/src/zenserver/compute/computeserver.cpp index 1673cea6c..7296098e0 100644 --- a/src/zenserver/compute/computeserver.cpp +++ b/src/zenserver/compute/computeserver.cpp @@ -444,11 +444,12 @@ ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig) ZEN_TRACE_CPU("ZenComputeServer::InitializeServices"); ZEN_INFO("initializing compute services"); - CidStoreConfiguration Config; - Config.RootDirectory = m_DataRoot / "cas"; + m_ActionStore = std::make_unique<MemoryCidStore>(); - m_CidStore = std::make_unique<CidStore>(m_GcManager); - m_CidStore->Initialize(Config); + CidStoreConfiguration WorkerStoreConfig; + WorkerStoreConfig.RootDirectory = m_DataRoot / "cas"; + m_WorkerStore = std::make_unique<CidStore>(m_GcManager); + m_WorkerStore->Initialize(WorkerStoreConfig); if (!ServerConfig.IdmsEndpoint.empty()) { @@ -476,7 +477,8 @@ ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig) std::make_unique<zen::compute::HttpOrchestratorService>(ServerConfig.DataDir / "orch", ServerConfig.EnableWorkerWebSocket); ZEN_INFO("instantiating function service"); - m_ComputeService = std::make_unique<zen::compute::HttpComputeService>(*m_CidStore, + m_ComputeService = std::make_unique<zen::compute::HttpComputeService>(*m_ActionStore, + *m_WorkerStore, m_StatsService, ServerConfig.DataDir / "functions", ServerConfig.MaxConcurrentActions); diff --git a/src/zenserver/compute/computeserver.h b/src/zenserver/compute/computeserver.h index 8f4edc0f0..38f93bc36 100644 --- a/src/zenserver/compute/computeserver.h +++ b/src/zenserver/compute/computeserver.h @@ -10,6 +10,7 @@ # include <zencore/system.h> # include <zenhttp/httpwsclient.h> # include <zenstore/gc.h> +# include <zenstore/memorycidstore.h> # include "frontend/frontend.h" namespace cxxopts { @@ -41,7 +42,6 @@ class NomadProvisioner; namespace zen { -class CidStore; class HttpApiService; struct ZenComputeServerConfig : public ZenServerConfig @@ -131,7 +131,8 @@ public: private: GcManager m_GcManager; GcScheduler m_GcScheduler{m_GcManager}; - std::unique_ptr<CidStore> m_CidStore; + std::unique_ptr<MemoryCidStore> m_ActionStore; + std::unique_ptr<CidStore> m_WorkerStore; std::unique_ptr<HttpApiService> m_ApiService; std::unique_ptr<zen::compute::HttpComputeService> m_ComputeService; std::unique_ptr<zen::compute::HttpOrchestratorService> m_OrchestratorService; diff --git a/src/zenserver/storage/zenstorageserver.cpp b/src/zenserver/storage/zenstorageserver.cpp index 6b1da5f12..5cd759c11 100644 --- a/src/zenserver/storage/zenstorageserver.cpp +++ b/src/zenserver/storage/zenstorageserver.cpp @@ -325,7 +325,7 @@ ZenStorageServer::InitializeServices(const ZenStorageServerConfig& ServerOptions ZEN_OTEL_SPAN("InitializeComputeService"); m_HttpComputeService = - std::make_unique<compute::HttpComputeService>(*m_CidStore, m_StatsService, ServerOptions.DataDir / "functions"); + std::make_unique<compute::HttpComputeService>(*m_CidStore, *m_CidStore, m_StatsService, ServerOptions.DataDir / "functions"); } #endif diff --git a/src/zenstore/cidstore.cpp b/src/zenstore/cidstore.cpp index b20d8f565..ac8a75a58 100644 --- a/src/zenstore/cidstore.cpp +++ b/src/zenstore/cidstore.cpp @@ -188,12 +188,24 @@ CidStore::Initialize(const CidStoreConfiguration& Config) } CidStore::InsertResult +CidStore::AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash) +{ + return m_Impl->AddChunk(ChunkData, RawHash, InsertMode::kMayBeMovedInPlace); +} + +CidStore::InsertResult CidStore::AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash, InsertMode Mode) { return m_Impl->AddChunk(ChunkData, RawHash, Mode); } std::vector<CidStore::InsertResult> +CidStore::AddChunks(std::span<IoBuffer> ChunkDatas, std::span<IoHash> RawHashes) +{ + return m_Impl->AddChunks(ChunkDatas, RawHashes, InsertMode::kMayBeMovedInPlace); +} + +std::vector<CidStore::InsertResult> CidStore::AddChunks(std::span<IoBuffer> ChunkDatas, std::span<IoHash> RawHashes, InsertMode Mode) { return m_Impl->AddChunks(ChunkDatas, RawHashes, Mode); diff --git a/src/zenstore/include/zenstore/cidstore.h b/src/zenstore/include/zenstore/cidstore.h index d54062476..c00e0449f 100644 --- a/src/zenstore/include/zenstore/cidstore.h +++ b/src/zenstore/include/zenstore/cidstore.h @@ -58,16 +58,14 @@ struct CidStoreConfiguration * */ -class CidStore final : public ChunkResolver, public StatsProvider +class CidStore final : public ChunkStore, public StatsProvider { public: CidStore(GcManager& Gc); ~CidStore(); - struct InsertResult - { - bool New = false; - }; + using InsertResult = ChunkStore::InsertResult; + enum class InsertMode { kCopyOnly, @@ -75,17 +73,17 @@ public: }; void Initialize(const CidStoreConfiguration& Config); - InsertResult AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash, InsertMode Mode = InsertMode::kMayBeMovedInPlace); - std::vector<InsertResult> AddChunks(std::span<IoBuffer> ChunkDatas, - std::span<IoHash> RawHashes, - InsertMode Mode = InsertMode::kMayBeMovedInPlace); - virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) override; + InsertResult AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash) override; + InsertResult AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash, InsertMode Mode); + std::vector<InsertResult> AddChunks(std::span<IoBuffer> ChunkDatas, std::span<IoHash> RawHashes) override; + std::vector<InsertResult> AddChunks(std::span<IoBuffer> ChunkDatas, std::span<IoHash> RawHashes, InsertMode Mode); + IoBuffer FindChunkByCid(const IoHash& DecompressedId) override; bool IterateChunks(std::span<IoHash> DecompressedIds, const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, WorkerThreadPool* OptionalWorkerPool, uint64_t LargeSizeLimit); - bool ContainsChunk(const IoHash& DecompressedId); - void FilterChunks(HashKeySet& InOutChunks); + bool ContainsChunk(const IoHash& DecompressedId) override; + void FilterChunks(HashKeySet& InOutChunks) override; void Flush(); CidStoreSize TotalSize() const; CidStoreStats Stats() const; diff --git a/src/zenstore/include/zenstore/memorycidstore.h b/src/zenstore/include/zenstore/memorycidstore.h new file mode 100644 index 000000000..0311274d5 --- /dev/null +++ b/src/zenstore/include/zenstore/memorycidstore.h @@ -0,0 +1,68 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "cidstore.h" +#include "zenstore.h" + +#include <zencore/iobuffer.h> +#include <zencore/iohash.h> +#include <zencore/thread.h> + +#include <deque> +#include <span> +#include <thread> +#include <unordered_map> + +namespace zen { + +class HashKeySet; + +/** Memory-backed chunk store. + * + * Stores chunks in an in-memory hash map, optionally layered over a + * standard CidStore for write-through and read fallback. When a backing + * store is provided: + * + * - AddChunk writes to memory and asynchronously to the backing store. + * - FindChunkByCid checks memory first, then falls back to the backing store. + * - ContainsChunk and FilterChunks check memory first, then the backing store. + * + * The memory store does NOT cache read-through results from the backing store. + * Only chunks explicitly added via AddChunk/AddChunks are held in memory. + */ + +class MemoryCidStore : public ChunkStore +{ +public: + explicit MemoryCidStore(CidStore* BackingStore = nullptr); + ~MemoryCidStore(); + + InsertResult AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash) override; + std::vector<InsertResult> AddChunks(std::span<IoBuffer> ChunkDatas, std::span<IoHash> RawHashes) override; + IoBuffer FindChunkByCid(const IoHash& DecompressedId) override; + bool ContainsChunk(const IoHash& DecompressedId) override; + void FilterChunks(HashKeySet& InOutChunks) override; + +private: + RwLock m_Lock; + std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> m_Chunks; + CidStore* m_BackingStore = nullptr; + + // Async write-through to backing store + struct PendingWrite + { + IoBuffer Data; + IoHash Hash; + }; + + std::mutex m_FlushLock; + std::vector<PendingWrite> m_FlushQueue; + Event m_FlushEvent; + std::thread m_FlushThread; + std::atomic<bool> m_FlushThreadEnabled{false}; + + void FlushThreadFunction(); +}; + +} // namespace zen diff --git a/src/zenstore/include/zenstore/zenstore.h b/src/zenstore/include/zenstore/zenstore.h index bed219b4b..95ae33a4a 100644 --- a/src/zenstore/include/zenstore/zenstore.h +++ b/src/zenstore/include/zenstore/zenstore.h @@ -4,19 +4,56 @@ #include <zencore/zencore.h> +#include <span> +#include <vector> + #define ZENSTORE_API namespace zen { +class HashKeySet; class IoBuffer; struct IoHash; class ChunkResolver { public: + virtual ~ChunkResolver() = default; virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) = 0; }; +/** Abstract chunk store interface. + * + * Extends ChunkResolver with write and query operations. Both CidStore + * (disk-backed) and MemoryCidStore (in-memory) implement this interface, + * allowing callers to be agnostic about the storage backend. + */ +class ChunkStore : public ChunkResolver +{ +public: + struct InsertResult + { + bool New = false; + }; + + virtual InsertResult AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash) = 0; + virtual std::vector<InsertResult> AddChunks(std::span<IoBuffer> ChunkDatas, std::span<IoHash> RawHashes) = 0; + virtual bool ContainsChunk(const IoHash& DecompressedId) = 0; + virtual void FilterChunks(HashKeySet& InOutChunks) = 0; +}; + +/** Composite resolver that tries a primary store first, then a fallback. */ +class FallbackChunkResolver : public ChunkResolver +{ +public: + FallbackChunkResolver(ChunkResolver& Primary, ChunkResolver& Fallback); + IoBuffer FindChunkByCid(const IoHash& DecompressedId) override; + +private: + ChunkResolver& m_Primary; + ChunkResolver& m_Fallback; +}; + ZENSTORE_API void zenstore_forcelinktests(); } // namespace zen diff --git a/src/zenstore/memorycidstore.cpp b/src/zenstore/memorycidstore.cpp new file mode 100644 index 000000000..b4832029b --- /dev/null +++ b/src/zenstore/memorycidstore.cpp @@ -0,0 +1,143 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenstore/hashkeyset.h> +#include <zenstore/memorycidstore.h> + +namespace zen { + +MemoryCidStore::MemoryCidStore(CidStore* BackingStore) : m_BackingStore(BackingStore) +{ + if (m_BackingStore) + { + m_FlushThreadEnabled = true; + m_FlushThread = std::thread(&MemoryCidStore::FlushThreadFunction, this); + } +} + +MemoryCidStore::~MemoryCidStore() +{ + m_FlushThreadEnabled = false; + m_FlushEvent.Set(); + if (m_FlushThread.joinable()) + { + m_FlushThread.join(); + } +} + +MemoryCidStore::InsertResult +MemoryCidStore::AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash) +{ + bool IsNew = false; + + m_Lock.WithExclusiveLock([&] { + auto [It, Inserted] = m_Chunks.try_emplace(RawHash, ChunkData); + IsNew = Inserted; + }); + + if (m_BackingStore) + { + std::lock_guard<std::mutex> Lock(m_FlushLock); + m_FlushQueue.push_back({.Data = ChunkData, .Hash = RawHash}); + m_FlushEvent.Set(); + } + + return {.New = IsNew}; +} + +std::vector<MemoryCidStore::InsertResult> +MemoryCidStore::AddChunks(std::span<IoBuffer> ChunkDatas, std::span<IoHash> RawHashes) +{ + std::vector<MemoryCidStore::InsertResult> Results; + Results.reserve(ChunkDatas.size()); + + for (size_t i = 0; i < ChunkDatas.size(); ++i) + { + Results.push_back(AddChunk(ChunkDatas[i], RawHashes[i])); + } + + return Results; +} + +IoBuffer +MemoryCidStore::FindChunkByCid(const IoHash& DecompressedId) +{ + IoBuffer Result; + + m_Lock.WithSharedLock([&] { + auto It = m_Chunks.find(DecompressedId); + if (It != m_Chunks.end()) + { + Result = It->second; + } + }); + + if (!Result && m_BackingStore) + { + Result = m_BackingStore->FindChunkByCid(DecompressedId); + } + + return Result; +} + +bool +MemoryCidStore::ContainsChunk(const IoHash& DecompressedId) +{ + bool Found = false; + + m_Lock.WithSharedLock([&] { Found = m_Chunks.find(DecompressedId) != m_Chunks.end(); }); + + if (!Found && m_BackingStore) + { + Found = m_BackingStore->ContainsChunk(DecompressedId); + } + + return Found; +} + +void +MemoryCidStore::FilterChunks(HashKeySet& InOutChunks) +{ + // Remove hashes that are present in our memory store + m_Lock.WithSharedLock([&] { InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return m_Chunks.find(Hash) != m_Chunks.end(); }); }); + + // Delegate remainder to backing store + if (m_BackingStore && !InOutChunks.IsEmpty()) + { + m_BackingStore->FilterChunks(InOutChunks); + } +} + +void +MemoryCidStore::FlushThreadFunction() +{ + SetCurrentThreadName("MemCidFlush"); + + while (m_FlushThreadEnabled) + { + m_FlushEvent.Wait(); + + std::vector<PendingWrite> Batch; + { + std::lock_guard<std::mutex> Lock(m_FlushLock); + Batch.swap(m_FlushQueue); + } + + for (PendingWrite& Write : Batch) + { + m_BackingStore->AddChunk(Write.Data, Write.Hash); + } + } + + // Drain remaining writes on shutdown + std::vector<PendingWrite> Remaining; + { + std::lock_guard<std::mutex> Lock(m_FlushLock); + Remaining.swap(m_FlushQueue); + } + for (PendingWrite& Write : Remaining) + { + m_BackingStore->AddChunk(Write.Data, Write.Hash); + } +} + +} // namespace zen diff --git a/src/zenstore/zenstore.cpp b/src/zenstore/zenstore.cpp index c563cc202..bf0c71211 100644 --- a/src/zenstore/zenstore.cpp +++ b/src/zenstore/zenstore.cpp @@ -2,6 +2,26 @@ #include "zenstore/zenstore.h" +#include <zencore/iobuffer.h> + +namespace zen { + +FallbackChunkResolver::FallbackChunkResolver(ChunkResolver& Primary, ChunkResolver& Fallback) : m_Primary(Primary), m_Fallback(Fallback) +{ +} + +IoBuffer +FallbackChunkResolver::FindChunkByCid(const IoHash& DecompressedId) +{ + if (IoBuffer Result = m_Primary.FindChunkByCid(DecompressedId)) + { + return Result; + } + return m_Fallback.FindChunkByCid(DecompressedId); +} + +} // namespace zen + #if ZEN_WITH_TESTS # include <zenstore/blockstore.h> |