diff options
| author | Per Larsson <[email protected]> | 2021-11-02 10:50:18 +0100 |
|---|---|---|
| committer | Per Larsson <[email protected]> | 2021-11-02 10:50:18 +0100 |
| commit | 4b8d4c0e375c729e38bdaadfebf0eaf14f08f5f9 (patch) | |
| tree | d1e6fb1c71a0c8c5e12b2814f309d09d7093c9b3 /zenserver/upstream | |
| parent | Merge branch 'main' into zcache-batch (diff) | |
| download | zen-4b8d4c0e375c729e38bdaadfebf0eaf14f08f5f9.tar.xz zen-4b8d4c0e375c729e38bdaadfebf0eaf14f08f5f9.zip | |
Added upstream batch API.
Diffstat (limited to 'zenserver/upstream')
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 41 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.h | 14 |
2 files changed, 55 insertions, 0 deletions
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index c0cd858b6..437b29cd7 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -780,6 +780,47 @@ public: return {}; } + virtual GetUpstreamCacheBatchResult GetCacheRecords(std::span<CacheKey> CacheKeys, + std::span<size_t> KeyIndex, + OnCacheGetComplete OnComplete) override + { + if (!m_Options.ReadUpstream) + { + return {.Missing = std::vector<size_t>(KeyIndex.begin(), KeyIndex.end())}; + } + + GetUpstreamCacheBatchResult Result; + + for (size_t Idx : KeyIndex) + { + const UpstreamCacheKey CacheKey = {CacheKeys[Idx].Bucket, CacheKeys[Idx].Hash}; + + GetUpstreamCacheResult CacheResult; + for (auto& Endpoint : m_Endpoints) + { + if (Endpoint->IsHealthy()) + { + CacheResult = Endpoint->GetCacheRecord(CacheKey, ZenContentType::kCbPackage); + if (CacheResult.Success) + { + break; + } + } + } + + if (CacheResult.Success) + { + OnComplete(Idx, CacheResult.Value); + } + else + { + Result.Missing.push_back(Idx); + } + } + + return Result; + } + virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override { if (m_Options.ReadUpstream) diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h index edc995da6..04554f210 100644 --- a/zenserver/upstream/upstreamcache.h +++ b/zenserver/upstream/upstreamcache.h @@ -2,12 +2,15 @@ #pragma once +#include "cache/cachekey.h" + #include <zencore/iobuffer.h> #include <zencore/iohash.h> #include <zencore/zencore.h> #include <atomic> #include <chrono> +#include <functional> #include <memory> namespace zen { @@ -62,6 +65,11 @@ struct GetUpstreamCacheResult bool Success = false; }; +struct GetUpstreamCacheBatchResult +{ + std::vector<size_t> Missing; +}; + struct PutUpstreamCacheResult { std::string Reason; @@ -88,6 +96,8 @@ struct UpstreamEndpointStats std::atomic<double> SecondsDown{}; }; +using OnCacheGetComplete = std::function<void(size_t, IoBuffer)>; + /** * The upstream endpont is responsible for handling upload/downloading of cache records. */ @@ -129,6 +139,10 @@ public: virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) = 0; + virtual GetUpstreamCacheBatchResult GetCacheRecords(std::span<CacheKey> CacheKeys, + std::span<size_t> KeyIndex, + OnCacheGetComplete OnComplete) = 0; + virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) = 0; struct EnqueueResult |