diff options
| author | Liam Mitchell <[email protected]> | 2025-08-21 23:58:51 +0000 |
|---|---|---|
| committer | Liam Mitchell <[email protected]> | 2025-08-21 23:58:51 +0000 |
| commit | 33209bd6931f49362dfc2d62c6cb6b87a42c99e1 (patch) | |
| tree | cfc7914634088b3f4feac2d4cec0b5650dfdcc3c /src/zenserver/upstream/upstreamcache.cpp | |
| parent | Fix changelog merge issues (diff) | |
| parent | avoid new in static IoBuffer (#472) (diff) | |
| download | zen-33209bd6931f49362dfc2d62c6cb6b87a42c99e1.tar.xz zen-33209bd6931f49362dfc2d62c6cb6b87a42c99e1.zip | |
Merge remote-tracking branch 'origin/main' into de/zen-service-command
Diffstat (limited to 'src/zenserver/upstream/upstreamcache.cpp')
| -rw-r--r-- | src/zenserver/upstream/upstreamcache.cpp | 38 |
1 files changed, 21 insertions, 17 deletions
diff --git a/src/zenserver/upstream/upstreamcache.cpp b/src/zenserver/upstream/upstreamcache.cpp index 744b861dd..a1c460bc0 100644 --- a/src/zenserver/upstream/upstreamcache.cpp +++ b/src/zenserver/upstream/upstreamcache.cpp @@ -1475,12 +1475,17 @@ namespace detail { class UpstreamCacheImpl final : public UpstreamCache { + struct EnqueuedRequest + { + UpstreamCacheRecord Record; + std::function<IoBuffer(const IoHash& ChunkHash)> GetValueFunc; + }; + public: - UpstreamCacheImpl(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore) + UpstreamCacheImpl(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore) : m_Log(logging::Get("upstream")) , m_Options(Options) , m_CacheStore(CacheStore) - , m_CidStore(CidStore) { } @@ -1836,17 +1841,17 @@ public: } } - virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) override + virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord, std::function<IoBuffer(const IoHash&)>&& GetValueFunc) override { if (m_RunState.IsRunning && m_Options.WriteUpstream && m_Endpoints.size() > 0) { if (!m_UpstreamThreads.empty()) { - m_UpstreamQueue.Enqueue(std::move(CacheRecord)); + m_UpstreamQueue.Enqueue(EnqueuedRequest{.Record = std::move(CacheRecord), .GetValueFunc = GetValueFunc}); } else { - ProcessCacheRecord(std::move(CacheRecord)); + ProcessCacheRecord(std::move(CacheRecord), std::move(GetValueFunc)); } } } @@ -1900,7 +1905,7 @@ public: } private: - void ProcessCacheRecord(UpstreamCacheRecord CacheRecord) + void ProcessCacheRecord(const UpstreamCacheRecord& CacheRecord, std::function<IoBuffer(const IoHash& ChunkHash)>&& GetValueFunc) { ZEN_TRACE_CPU("Upstream::ProcessCacheRecord"); @@ -1918,7 +1923,7 @@ private: for (const IoHash& ValueContentId : CacheRecord.ValueContentIds) { - if (IoBuffer Payload = m_CidStore.FindChunkByCid(ValueContentId)) + if (IoBuffer Payload = GetValueFunc(ValueContentId)) { Payloads.push_back(Payload); } @@ -1970,19 +1975,19 @@ private: for (;;) { - UpstreamCacheRecord CacheRecord; - if (m_UpstreamQueue.WaitAndDequeue(CacheRecord)) + EnqueuedRequest Request; + if (m_UpstreamQueue.WaitAndDequeue(Request)) { try { - ProcessCacheRecord(std::move(CacheRecord)); + ProcessCacheRecord(Request.Record, std::move(Request.GetValueFunc)); } catch (const std::exception& Err) { ZEN_ERROR("upload cache record '{}/{}/{}' FAILED, reason '{}'", - CacheRecord.Namespace, - CacheRecord.Key.Bucket, - CacheRecord.Key.Hash, + Request.Record.Namespace, + Request.Record.Key.Bucket, + Request.Record.Key.Hash, Err.what()); } } @@ -2076,7 +2081,7 @@ private: LoggerRef Log() { return m_Log; } - using UpstreamQueue = BlockingQueue<UpstreamCacheRecord>; + using UpstreamQueue = BlockingQueue<EnqueuedRequest>; struct RunState { @@ -2102,7 +2107,6 @@ private: LoggerRef m_Log; UpstreamCacheOptions m_Options; ZenCacheStore& m_CacheStore; - CidStore& m_CidStore; UpstreamQueue m_UpstreamQueue; std::shared_mutex m_EndpointsMutex; std::vector<std::unique_ptr<UpstreamEndpoint>> m_Endpoints; @@ -2126,9 +2130,9 @@ UpstreamEndpoint::CreateJupiterEndpoint(const JupiterClientOptions& Options, con } std::unique_ptr<UpstreamCache> -CreateUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore) +CreateUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore) { - return std::make_unique<UpstreamCacheImpl>(Options, CacheStore, CidStore); + return std::make_unique<UpstreamCacheImpl>(Options, CacheStore); } } // namespace zen |