aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/upstream
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver/upstream')
-rw-r--r--src/zenserver/upstream/upstreamcache.cpp6
-rw-r--r--src/zenserver/upstream/upstreamcache.h128
-rw-r--r--src/zenserver/upstream/zen.cpp4
3 files changed, 25 insertions, 113 deletions
diff --git a/src/zenserver/upstream/upstreamcache.cpp b/src/zenserver/upstream/upstreamcache.cpp
index 6dde0a701..dac29c273 100644
--- a/src/zenserver/upstream/upstreamcache.cpp
+++ b/src/zenserver/upstream/upstreamcache.cpp
@@ -16,10 +16,10 @@
#include <zencore/trace.h>
#include <zenhttp/auth/authmgr.h>
-#include <zenhttp/httpshared.h>
#include <zenstore/cidstore.h>
+#include <zenutil/packageformat.h>
-#include <zenstore/structuredcachestore.h>
+#include <zenstore/cache/structuredcachestore.h>
#include "cache/httpstructuredcache.h"
#include "diag/logging.h"
@@ -2129,7 +2129,7 @@ UpstreamEndpoint::CreateJupiterEndpoint(const CloudCacheClientOptions& Options,
}
std::unique_ptr<UpstreamCache>
-UpstreamCache::Create(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore)
+CreateUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore)
{
return std::make_unique<UpstreamCacheImpl>(Options, CacheStore, CidStore);
}
diff --git a/src/zenserver/upstream/upstreamcache.h b/src/zenserver/upstream/upstreamcache.h
index 291e7e95e..bb0193e4e 100644
--- a/src/zenserver/upstream/upstreamcache.h
+++ b/src/zenserver/upstream/upstreamcache.h
@@ -8,6 +8,7 @@
#include <zencore/iohash.h>
#include <zencore/stats.h>
#include <zencore/zencore.h>
+#include <zenstore/cache/upstreamcacheclient.h>
#include <zenutil/cache/cache.h>
#include <atomic>
@@ -29,95 +30,6 @@ struct CloudCacheClientOptions;
class CloudCacheTokenProvider;
struct ZenStructuredCacheClientOptions;
-struct UpstreamCacheRecord
-{
- ZenContentType Type = ZenContentType::kBinary;
- std::string Namespace;
- CacheKey Key;
- std::vector<IoHash> ValueContentIds;
- CacheRequestContext Context;
-};
-
-struct UpstreamCacheOptions
-{
- std::chrono::seconds HealthCheckInterval{5};
- uint32_t ThreadCount = 4;
- bool ReadUpstream = true;
- bool WriteUpstream = true;
-};
-
-struct UpstreamError
-{
- int32_t ErrorCode{};
- std::string Reason{};
-
- explicit operator bool() const { return ErrorCode != 0; }
-};
-
-struct UpstreamEndpointInfo
-{
- std::string Name;
- std::string Url;
-};
-
-struct GetUpstreamCacheResult
-{
- UpstreamError Error{};
- int64_t Bytes{};
- double ElapsedSeconds{};
- bool Success = false;
-};
-
-struct GetUpstreamCacheSingleResult
-{
- GetUpstreamCacheResult Status;
- IoBuffer Value;
- const UpstreamEndpointInfo* Source = nullptr;
-};
-
-struct PutUpstreamCacheResult
-{
- std::string Reason;
- int64_t Bytes{};
- double ElapsedSeconds{};
- bool Success = false;
-};
-
-struct CacheRecordGetCompleteParams
-{
- CacheKeyRequest& Request;
- const CbObjectView& Record;
- const CbPackage& Package;
- double ElapsedSeconds{};
- const UpstreamEndpointInfo* Source = nullptr;
-};
-
-using OnCacheRecordGetComplete = std::function<void(CacheRecordGetCompleteParams&&)>;
-
-struct CacheValueGetCompleteParams
-{
- CacheValueRequest& Request;
- IoHash RawHash;
- uint64_t RawSize;
- IoBuffer Value;
- double ElapsedSeconds{};
- const UpstreamEndpointInfo* Source = nullptr;
-};
-
-using OnCacheValueGetComplete = std::function<void(CacheValueGetCompleteParams&&)>;
-
-struct CacheChunkGetCompleteParams
-{
- CacheChunkRequest& Request;
- IoHash RawHash;
- uint64_t RawSize;
- IoBuffer Value;
- double ElapsedSeconds{};
- const UpstreamEndpointInfo* Source = nullptr;
-};
-
-using OnCacheChunksGetComplete = std::function<void(CacheChunkGetCompleteParams&&)>;
-
struct UpstreamEndpointStats
{
metrics::OperationTiming CacheGetRequestTiming;
@@ -172,6 +84,13 @@ struct UpstreamEndpointStatus
UpstreamEndpointState State;
};
+struct GetUpstreamCacheSingleResult
+{
+ GetUpstreamCacheResult Status;
+ IoBuffer Value;
+ const UpstreamEndpointInfo* Source = nullptr;
+};
+
/**
* The upstream endpoint is responsible for handling upload/downloading of cache records.
*/
@@ -217,39 +136,32 @@ public:
/**
* Manages one or more upstream cache endpoints.
*/
-class UpstreamCache
+
+class UpstreamCache : public UpstreamCacheClient
{
public:
- virtual ~UpstreamCache() = default;
-
virtual void Initialize() = 0;
- virtual bool IsActive() = 0;
-
virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) = 0;
virtual void IterateEndpoints(std::function<bool(UpstreamEndpoint&)>&& Fn) = 0;
virtual GetUpstreamCacheSingleResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) = 0;
- virtual void GetCacheRecords(std::string_view Namespace,
- std::span<CacheKeyRequest*> Requests,
- OnCacheRecordGetComplete&& OnComplete) = 0;
-
- virtual void GetCacheValues(std::string_view Namespace,
- std::span<CacheValueRequest*> CacheValueRequests,
- OnCacheValueGetComplete&& OnComplete) = 0;
virtual GetUpstreamCacheSingleResult GetCacheChunk(std::string_view Namespace,
const CacheKey& CacheKey,
- const IoHash& ValueContentId) = 0;
- virtual void GetCacheChunks(std::string_view Namespace,
- std::span<CacheChunkRequest*> CacheChunkRequests,
- OnCacheChunksGetComplete&& OnComplete) = 0;
-
- virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0;
+ const IoHash& ValueContentId) = 0;
virtual void GetStatus(CbObjectWriter& CbO) = 0;
+};
- static std::unique_ptr<UpstreamCache> Create(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore);
+struct UpstreamCacheOptions
+{
+ std::chrono::seconds HealthCheckInterval{5};
+ uint32_t ThreadCount = 4;
+ bool ReadUpstream = true;
+ bool WriteUpstream = true;
};
+std::unique_ptr<UpstreamCache> CreateUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore);
+
} // namespace zen
diff --git a/src/zenserver/upstream/zen.cpp b/src/zenserver/upstream/zen.cpp
index afc3b8438..c031a4086 100644
--- a/src/zenserver/upstream/zen.cpp
+++ b/src/zenserver/upstream/zen.cpp
@@ -10,9 +10,9 @@
#include <zencore/stream.h>
#include <zenhttp/formatters.h>
#include <zenhttp/httpcommon.h>
-#include <zenhttp/httpshared.h>
+#include <zenutil/packageformat.h>
-#include <zenstore/structuredcachestore.h>
+#include <zenstore/cache/structuredcachestore.h>
#include "diag/logging.h"
ZEN_THIRD_PARTY_INCLUDES_START