diff options
| author | Dan Engelbrecht <[email protected]> | 2025-08-26 11:43:37 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-08-26 11:43:37 +0200 |
| commit | fb137cf9c8b7a9d1659b03472c9591c4863e9173 (patch) | |
| tree | bbd0df09d4b425ef668d22f3b12ea2cb3482bf66 /src/zenserver/upstream/upstreamcache.cpp | |
| parent | Merge pull request #139 from ue-foundation/de/zen-service-command (diff) | |
| download | zen-fb137cf9c8b7a9d1659b03472c9591c4863e9173.tar.xz zen-fb137cf9c8b7a9d1659b03472c9591c4863e9173.zip | |
revert multi-cid store (#475)
Diffstat (limited to 'src/zenserver/upstream/upstreamcache.cpp')
| -rw-r--r-- | src/zenserver/upstream/upstreamcache.cpp | 38 |
1 files changed, 17 insertions, 21 deletions
diff --git a/src/zenserver/upstream/upstreamcache.cpp b/src/zenserver/upstream/upstreamcache.cpp index a1c460bc0..744b861dd 100644 --- a/src/zenserver/upstream/upstreamcache.cpp +++ b/src/zenserver/upstream/upstreamcache.cpp @@ -1475,17 +1475,12 @@ namespace detail { class UpstreamCacheImpl final : public UpstreamCache { - struct EnqueuedRequest - { - UpstreamCacheRecord Record; - std::function<IoBuffer(const IoHash& ChunkHash)> GetValueFunc; - }; - public: - UpstreamCacheImpl(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore) + UpstreamCacheImpl(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore) : m_Log(logging::Get("upstream")) , m_Options(Options) , m_CacheStore(CacheStore) + , m_CidStore(CidStore) { } @@ -1841,17 +1836,17 @@ public: } } - virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord, std::function<IoBuffer(const IoHash&)>&& GetValueFunc) override + virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) override { if (m_RunState.IsRunning && m_Options.WriteUpstream && m_Endpoints.size() > 0) { if (!m_UpstreamThreads.empty()) { - m_UpstreamQueue.Enqueue(EnqueuedRequest{.Record = std::move(CacheRecord), .GetValueFunc = GetValueFunc}); + m_UpstreamQueue.Enqueue(std::move(CacheRecord)); } else { - ProcessCacheRecord(std::move(CacheRecord), std::move(GetValueFunc)); + ProcessCacheRecord(std::move(CacheRecord)); } } } @@ -1905,7 +1900,7 @@ public: } private: - void ProcessCacheRecord(const UpstreamCacheRecord& CacheRecord, std::function<IoBuffer(const IoHash& ChunkHash)>&& GetValueFunc) + void ProcessCacheRecord(UpstreamCacheRecord CacheRecord) { ZEN_TRACE_CPU("Upstream::ProcessCacheRecord"); @@ -1923,7 +1918,7 @@ private: for (const IoHash& ValueContentId : CacheRecord.ValueContentIds) { - if (IoBuffer Payload = GetValueFunc(ValueContentId)) + if (IoBuffer Payload = m_CidStore.FindChunkByCid(ValueContentId)) { Payloads.push_back(Payload); } @@ -1975,19 +1970,19 @@ private: for (;;) { - EnqueuedRequest Request; - if (m_UpstreamQueue.WaitAndDequeue(Request)) + UpstreamCacheRecord CacheRecord; + if (m_UpstreamQueue.WaitAndDequeue(CacheRecord)) { try { - ProcessCacheRecord(Request.Record, std::move(Request.GetValueFunc)); + ProcessCacheRecord(std::move(CacheRecord)); } catch (const std::exception& Err) { ZEN_ERROR("upload cache record '{}/{}/{}' FAILED, reason '{}'", - Request.Record.Namespace, - Request.Record.Key.Bucket, - Request.Record.Key.Hash, + CacheRecord.Namespace, + CacheRecord.Key.Bucket, + CacheRecord.Key.Hash, Err.what()); } } @@ -2081,7 +2076,7 @@ private: LoggerRef Log() { return m_Log; } - using UpstreamQueue = BlockingQueue<EnqueuedRequest>; + using UpstreamQueue = BlockingQueue<UpstreamCacheRecord>; struct RunState { @@ -2107,6 +2102,7 @@ private: LoggerRef m_Log; UpstreamCacheOptions m_Options; ZenCacheStore& m_CacheStore; + CidStore& m_CidStore; UpstreamQueue m_UpstreamQueue; std::shared_mutex m_EndpointsMutex; std::vector<std::unique_ptr<UpstreamEndpoint>> m_Endpoints; @@ -2130,9 +2126,9 @@ UpstreamEndpoint::CreateJupiterEndpoint(const JupiterClientOptions& Options, con } std::unique_ptr<UpstreamCache> -CreateUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore) +CreateUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore) { - return std::make_unique<UpstreamCacheImpl>(Options, CacheStore); + return std::make_unique<UpstreamCacheImpl>(Options, CacheStore, CidStore); } } // namespace zen |