diff options
| author | Dan Engelbrecht <[email protected]> | 2025-08-20 12:33:03 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-08-20 12:33:03 +0200 |
| commit | 4c05d1041461b630cd5770dae5e8d03147d5674b (patch) | |
| tree | 3f5d6b1b4b2b3f167f94e98f902a5f60c2e3d753 /src/zenserver/upstream/upstreamcache.cpp | |
| parent | zen print fixes/improvements (#469) (diff) | |
| download | zen-4c05d1041461b630cd5770dae5e8d03147d5674b.tar.xz zen-4c05d1041461b630cd5770dae5e8d03147d5674b.zip | |
per namespace/project cas prep refactor (#470)
- Refactor so we can have more than one cas store for project store and cache.
- Refactor `UpstreamCacheClient` so it is not tied to a specific CidStore
- Refactor scrub to keep the GC interface ScrubStorage function separate from scrub accessor functions (renamed to Scrub).
- Refactor storage size to keep GC interface StorageSize function separate from size accessor functions (renamed to TotalSize)
- Refactor cache storage so `ZenCacheDiskLayer::CacheBucket` implements GcStorage interface rather than `ZenCacheNamespace`
Diffstat (limited to 'src/zenserver/upstream/upstreamcache.cpp')
| -rw-r--r-- | src/zenserver/upstream/upstreamcache.cpp | 38 |
1 files changed, 21 insertions, 17 deletions
diff --git a/src/zenserver/upstream/upstreamcache.cpp b/src/zenserver/upstream/upstreamcache.cpp index 744b861dd..a1c460bc0 100644 --- a/src/zenserver/upstream/upstreamcache.cpp +++ b/src/zenserver/upstream/upstreamcache.cpp @@ -1475,12 +1475,17 @@ namespace detail { class UpstreamCacheImpl final : public UpstreamCache { + struct EnqueuedRequest + { + UpstreamCacheRecord Record; + std::function<IoBuffer(const IoHash& ChunkHash)> GetValueFunc; + }; + public: - UpstreamCacheImpl(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore) + UpstreamCacheImpl(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore) : m_Log(logging::Get("upstream")) , m_Options(Options) , m_CacheStore(CacheStore) - , m_CidStore(CidStore) { } @@ -1836,17 +1841,17 @@ public: } } - virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) override + virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord, std::function<IoBuffer(const IoHash&)>&& GetValueFunc) override { if (m_RunState.IsRunning && m_Options.WriteUpstream && m_Endpoints.size() > 0) { if (!m_UpstreamThreads.empty()) { - m_UpstreamQueue.Enqueue(std::move(CacheRecord)); + m_UpstreamQueue.Enqueue(EnqueuedRequest{.Record = std::move(CacheRecord), .GetValueFunc = GetValueFunc}); } else { - ProcessCacheRecord(std::move(CacheRecord)); + ProcessCacheRecord(std::move(CacheRecord), std::move(GetValueFunc)); } } } @@ -1900,7 +1905,7 @@ public: } private: - void ProcessCacheRecord(UpstreamCacheRecord CacheRecord) + void ProcessCacheRecord(const UpstreamCacheRecord& CacheRecord, std::function<IoBuffer(const IoHash& ChunkHash)>&& GetValueFunc) { ZEN_TRACE_CPU("Upstream::ProcessCacheRecord"); @@ -1918,7 +1923,7 @@ private: for (const IoHash& ValueContentId : CacheRecord.ValueContentIds) { - if (IoBuffer Payload = m_CidStore.FindChunkByCid(ValueContentId)) + if (IoBuffer Payload = GetValueFunc(ValueContentId)) { Payloads.push_back(Payload); } @@ -1970,19 +1975,19 @@ private: for (;;) { - UpstreamCacheRecord CacheRecord; - if (m_UpstreamQueue.WaitAndDequeue(CacheRecord)) + EnqueuedRequest Request; + if (m_UpstreamQueue.WaitAndDequeue(Request)) { try { - ProcessCacheRecord(std::move(CacheRecord)); + ProcessCacheRecord(Request.Record, std::move(Request.GetValueFunc)); } catch (const std::exception& Err) { ZEN_ERROR("upload cache record '{}/{}/{}' FAILED, reason '{}'", - CacheRecord.Namespace, - CacheRecord.Key.Bucket, - CacheRecord.Key.Hash, + Request.Record.Namespace, + Request.Record.Key.Bucket, + Request.Record.Key.Hash, Err.what()); } } @@ -2076,7 +2081,7 @@ private: LoggerRef Log() { return m_Log; } - using UpstreamQueue = BlockingQueue<UpstreamCacheRecord>; + using UpstreamQueue = BlockingQueue<EnqueuedRequest>; struct RunState { @@ -2102,7 +2107,6 @@ private: LoggerRef m_Log; UpstreamCacheOptions m_Options; ZenCacheStore& m_CacheStore; - CidStore& m_CidStore; UpstreamQueue m_UpstreamQueue; std::shared_mutex m_EndpointsMutex; std::vector<std::unique_ptr<UpstreamEndpoint>> m_Endpoints; @@ -2126,9 +2130,9 @@ UpstreamEndpoint::CreateJupiterEndpoint(const JupiterClientOptions& Options, con } std::unique_ptr<UpstreamCache> -CreateUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore) +CreateUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore) { - return std::make_unique<UpstreamCacheImpl>(Options, CacheStore, CidStore); + return std::make_unique<UpstreamCacheImpl>(Options, CacheStore); } } // namespace zen |