aboutsummaryrefslogtreecommitdiff
path: root/zenserver/upstream
diff options
context:
space:
mode:
authorPer Larsson <[email protected]>2021-11-02 10:50:18 +0100
committerPer Larsson <[email protected]>2021-11-02 10:50:18 +0100
commit4b8d4c0e375c729e38bdaadfebf0eaf14f08f5f9 (patch)
treed1e6fb1c71a0c8c5e12b2814f309d09d7093c9b3 /zenserver/upstream
parentMerge branch 'main' into zcache-batch (diff)
downloadzen-4b8d4c0e375c729e38bdaadfebf0eaf14f08f5f9.tar.xz
zen-4b8d4c0e375c729e38bdaadfebf0eaf14f08f5f9.zip
Added upstream batch API.
Diffstat (limited to 'zenserver/upstream')
-rw-r--r--zenserver/upstream/upstreamcache.cpp41
-rw-r--r--zenserver/upstream/upstreamcache.h14
2 files changed, 55 insertions, 0 deletions
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index c0cd858b6..437b29cd7 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -780,6 +780,47 @@ public:
return {};
}
+ virtual GetUpstreamCacheBatchResult GetCacheRecords(std::span<CacheKey> CacheKeys,
+ std::span<size_t> KeyIndex,
+ OnCacheGetComplete OnComplete) override
+ {
+ if (!m_Options.ReadUpstream)
+ {
+ return {.Missing = std::vector<size_t>(KeyIndex.begin(), KeyIndex.end())};
+ }
+
+ GetUpstreamCacheBatchResult Result;
+
+ for (size_t Idx : KeyIndex)
+ {
+ const UpstreamCacheKey CacheKey = {CacheKeys[Idx].Bucket, CacheKeys[Idx].Hash};
+
+ GetUpstreamCacheResult CacheResult;
+ for (auto& Endpoint : m_Endpoints)
+ {
+ if (Endpoint->IsHealthy())
+ {
+ CacheResult = Endpoint->GetCacheRecord(CacheKey, ZenContentType::kCbPackage);
+ if (CacheResult.Success)
+ {
+ break;
+ }
+ }
+ }
+
+ if (CacheResult.Success)
+ {
+ OnComplete(Idx, CacheResult.Value);
+ }
+ else
+ {
+ Result.Missing.push_back(Idx);
+ }
+ }
+
+ return Result;
+ }
+
virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override
{
if (m_Options.ReadUpstream)
diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h
index edc995da6..04554f210 100644
--- a/zenserver/upstream/upstreamcache.h
+++ b/zenserver/upstream/upstreamcache.h
@@ -2,12 +2,15 @@
#pragma once
+#include "cache/cachekey.h"
+
#include <zencore/iobuffer.h>
#include <zencore/iohash.h>
#include <zencore/zencore.h>
#include <atomic>
#include <chrono>
+#include <functional>
#include <memory>
namespace zen {
@@ -62,6 +65,11 @@ struct GetUpstreamCacheResult
bool Success = false;
};
+struct GetUpstreamCacheBatchResult
+{
+ std::vector<size_t> Missing;
+};
+
struct PutUpstreamCacheResult
{
std::string Reason;
@@ -88,6 +96,8 @@ struct UpstreamEndpointStats
std::atomic<double> SecondsDown{};
};
+using OnCacheGetComplete = std::function<void(size_t, IoBuffer)>;
+
/**
* The upstream endpont is responsible for handling upload/downloading of cache records.
*/
@@ -129,6 +139,10 @@ public:
virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) = 0;
+ virtual GetUpstreamCacheBatchResult GetCacheRecords(std::span<CacheKey> CacheKeys,
+ std::span<size_t> KeyIndex,
+ OnCacheGetComplete OnComplete) = 0;
+
virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) = 0;
struct EnqueueResult