aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/upstream/upstreamcache.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-08-26 11:43:37 +0200
committerGitHub Enterprise <[email protected]>2025-08-26 11:43:37 +0200
commitfb137cf9c8b7a9d1659b03472c9591c4863e9173 (patch)
treebbd0df09d4b425ef668d22f3b12ea2cb3482bf66 /src/zenserver/upstream/upstreamcache.cpp
parentMerge pull request #139 from ue-foundation/de/zen-service-command (diff)
downloadzen-fb137cf9c8b7a9d1659b03472c9591c4863e9173.tar.xz
zen-fb137cf9c8b7a9d1659b03472c9591c4863e9173.zip
revert multi-cid store (#475)
Diffstat (limited to 'src/zenserver/upstream/upstreamcache.cpp')
-rw-r--r--src/zenserver/upstream/upstreamcache.cpp38
1 files changed, 17 insertions, 21 deletions
diff --git a/src/zenserver/upstream/upstreamcache.cpp b/src/zenserver/upstream/upstreamcache.cpp
index a1c460bc0..744b861dd 100644
--- a/src/zenserver/upstream/upstreamcache.cpp
+++ b/src/zenserver/upstream/upstreamcache.cpp
@@ -1475,17 +1475,12 @@ namespace detail {
class UpstreamCacheImpl final : public UpstreamCache
{
- struct EnqueuedRequest
- {
- UpstreamCacheRecord Record;
- std::function<IoBuffer(const IoHash& ChunkHash)> GetValueFunc;
- };
-
public:
- UpstreamCacheImpl(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore)
+ UpstreamCacheImpl(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore)
: m_Log(logging::Get("upstream"))
, m_Options(Options)
, m_CacheStore(CacheStore)
+ , m_CidStore(CidStore)
{
}
@@ -1841,17 +1836,17 @@ public:
}
}
- virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord, std::function<IoBuffer(const IoHash&)>&& GetValueFunc) override
+ virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) override
{
if (m_RunState.IsRunning && m_Options.WriteUpstream && m_Endpoints.size() > 0)
{
if (!m_UpstreamThreads.empty())
{
- m_UpstreamQueue.Enqueue(EnqueuedRequest{.Record = std::move(CacheRecord), .GetValueFunc = GetValueFunc});
+ m_UpstreamQueue.Enqueue(std::move(CacheRecord));
}
else
{
- ProcessCacheRecord(std::move(CacheRecord), std::move(GetValueFunc));
+ ProcessCacheRecord(std::move(CacheRecord));
}
}
}
@@ -1905,7 +1900,7 @@ public:
}
private:
- void ProcessCacheRecord(const UpstreamCacheRecord& CacheRecord, std::function<IoBuffer(const IoHash& ChunkHash)>&& GetValueFunc)
+ void ProcessCacheRecord(UpstreamCacheRecord CacheRecord)
{
ZEN_TRACE_CPU("Upstream::ProcessCacheRecord");
@@ -1923,7 +1918,7 @@ private:
for (const IoHash& ValueContentId : CacheRecord.ValueContentIds)
{
- if (IoBuffer Payload = GetValueFunc(ValueContentId))
+ if (IoBuffer Payload = m_CidStore.FindChunkByCid(ValueContentId))
{
Payloads.push_back(Payload);
}
@@ -1975,19 +1970,19 @@ private:
for (;;)
{
- EnqueuedRequest Request;
- if (m_UpstreamQueue.WaitAndDequeue(Request))
+ UpstreamCacheRecord CacheRecord;
+ if (m_UpstreamQueue.WaitAndDequeue(CacheRecord))
{
try
{
- ProcessCacheRecord(Request.Record, std::move(Request.GetValueFunc));
+ ProcessCacheRecord(std::move(CacheRecord));
}
catch (const std::exception& Err)
{
ZEN_ERROR("upload cache record '{}/{}/{}' FAILED, reason '{}'",
- Request.Record.Namespace,
- Request.Record.Key.Bucket,
- Request.Record.Key.Hash,
+ CacheRecord.Namespace,
+ CacheRecord.Key.Bucket,
+ CacheRecord.Key.Hash,
Err.what());
}
}
@@ -2081,7 +2076,7 @@ private:
LoggerRef Log() { return m_Log; }
- using UpstreamQueue = BlockingQueue<EnqueuedRequest>;
+ using UpstreamQueue = BlockingQueue<UpstreamCacheRecord>;
struct RunState
{
@@ -2107,6 +2102,7 @@ private:
LoggerRef m_Log;
UpstreamCacheOptions m_Options;
ZenCacheStore& m_CacheStore;
+ CidStore& m_CidStore;
UpstreamQueue m_UpstreamQueue;
std::shared_mutex m_EndpointsMutex;
std::vector<std::unique_ptr<UpstreamEndpoint>> m_Endpoints;
@@ -2130,9 +2126,9 @@ UpstreamEndpoint::CreateJupiterEndpoint(const JupiterClientOptions& Options, con
}
std::unique_ptr<UpstreamCache>
-CreateUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore)
+CreateUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore)
{
- return std::make_unique<UpstreamCacheImpl>(Options, CacheStore);
+ return std::make_unique<UpstreamCacheImpl>(Options, CacheStore, CidStore);
}
} // namespace zen