aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/upstream/upstreamcache.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-08-20 12:33:03 +0200
committerGitHub Enterprise <[email protected]>2025-08-20 12:33:03 +0200
commit4c05d1041461b630cd5770dae5e8d03147d5674b (patch)
tree3f5d6b1b4b2b3f167f94e98f902a5f60c2e3d753 /src/zenserver/upstream/upstreamcache.cpp
parentzen print fixes/improvements (#469) (diff)
downloadzen-4c05d1041461b630cd5770dae5e8d03147d5674b.tar.xz
zen-4c05d1041461b630cd5770dae5e8d03147d5674b.zip
per namespace/project cas prep refactor (#470)
- Refactor so we can have more than one cas store for project store and cache. - Refactor `UpstreamCacheClient` so it is not tied to a specific CidStore - Refactor scrub to keep the GC interface ScrubStorage function separate from scrub accessor functions (renamed to Scrub). - Refactor storage size to keep GC interface StorageSize function separate from size accessor functions (renamed to TotalSize) - Refactor cache storage so `ZenCacheDiskLayer::CacheBucket` implements GcStorage interface rather than `ZenCacheNamespace`
Diffstat (limited to 'src/zenserver/upstream/upstreamcache.cpp')
-rw-r--r--src/zenserver/upstream/upstreamcache.cpp38
1 files changed, 21 insertions, 17 deletions
diff --git a/src/zenserver/upstream/upstreamcache.cpp b/src/zenserver/upstream/upstreamcache.cpp
index 744b861dd..a1c460bc0 100644
--- a/src/zenserver/upstream/upstreamcache.cpp
+++ b/src/zenserver/upstream/upstreamcache.cpp
@@ -1475,12 +1475,17 @@ namespace detail {
class UpstreamCacheImpl final : public UpstreamCache
{
+ struct EnqueuedRequest
+ {
+ UpstreamCacheRecord Record;
+ std::function<IoBuffer(const IoHash& ChunkHash)> GetValueFunc;
+ };
+
public:
- UpstreamCacheImpl(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore)
+ UpstreamCacheImpl(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore)
: m_Log(logging::Get("upstream"))
, m_Options(Options)
, m_CacheStore(CacheStore)
- , m_CidStore(CidStore)
{
}
@@ -1836,17 +1841,17 @@ public:
}
}
- virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) override
+ virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord, std::function<IoBuffer(const IoHash&)>&& GetValueFunc) override
{
if (m_RunState.IsRunning && m_Options.WriteUpstream && m_Endpoints.size() > 0)
{
if (!m_UpstreamThreads.empty())
{
- m_UpstreamQueue.Enqueue(std::move(CacheRecord));
+ m_UpstreamQueue.Enqueue(EnqueuedRequest{.Record = std::move(CacheRecord), .GetValueFunc = GetValueFunc});
}
else
{
- ProcessCacheRecord(std::move(CacheRecord));
+ ProcessCacheRecord(std::move(CacheRecord), std::move(GetValueFunc));
}
}
}
@@ -1900,7 +1905,7 @@ public:
}
private:
- void ProcessCacheRecord(UpstreamCacheRecord CacheRecord)
+ void ProcessCacheRecord(const UpstreamCacheRecord& CacheRecord, std::function<IoBuffer(const IoHash& ChunkHash)>&& GetValueFunc)
{
ZEN_TRACE_CPU("Upstream::ProcessCacheRecord");
@@ -1918,7 +1923,7 @@ private:
for (const IoHash& ValueContentId : CacheRecord.ValueContentIds)
{
- if (IoBuffer Payload = m_CidStore.FindChunkByCid(ValueContentId))
+ if (IoBuffer Payload = GetValueFunc(ValueContentId))
{
Payloads.push_back(Payload);
}
@@ -1970,19 +1975,19 @@ private:
for (;;)
{
- UpstreamCacheRecord CacheRecord;
- if (m_UpstreamQueue.WaitAndDequeue(CacheRecord))
+ EnqueuedRequest Request;
+ if (m_UpstreamQueue.WaitAndDequeue(Request))
{
try
{
- ProcessCacheRecord(std::move(CacheRecord));
+ ProcessCacheRecord(Request.Record, std::move(Request.GetValueFunc));
}
catch (const std::exception& Err)
{
ZEN_ERROR("upload cache record '{}/{}/{}' FAILED, reason '{}'",
- CacheRecord.Namespace,
- CacheRecord.Key.Bucket,
- CacheRecord.Key.Hash,
+ Request.Record.Namespace,
+ Request.Record.Key.Bucket,
+ Request.Record.Key.Hash,
Err.what());
}
}
@@ -2076,7 +2081,7 @@ private:
LoggerRef Log() { return m_Log; }
- using UpstreamQueue = BlockingQueue<UpstreamCacheRecord>;
+ using UpstreamQueue = BlockingQueue<EnqueuedRequest>;
struct RunState
{
@@ -2102,7 +2107,6 @@ private:
LoggerRef m_Log;
UpstreamCacheOptions m_Options;
ZenCacheStore& m_CacheStore;
- CidStore& m_CidStore;
UpstreamQueue m_UpstreamQueue;
std::shared_mutex m_EndpointsMutex;
std::vector<std::unique_ptr<UpstreamEndpoint>> m_Endpoints;
@@ -2126,9 +2130,9 @@ UpstreamEndpoint::CreateJupiterEndpoint(const JupiterClientOptions& Options, con
}
std::unique_ptr<UpstreamCache>
-CreateUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore)
+CreateUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore)
{
- return std::make_unique<UpstreamCacheImpl>(Options, CacheStore, CidStore);
+ return std::make_unique<UpstreamCacheImpl>(Options, CacheStore);
}
} // namespace zen