aboutsummaryrefslogtreecommitdiff
path: root/zenserver/upstream
diff options
context:
space:
mode:
Diffstat (limited to 'zenserver/upstream')
-rw-r--r--zenserver/upstream/jupiter.h4
-rw-r--r--zenserver/upstream/upstreamapply.h33
-rw-r--r--zenserver/upstream/upstreamcache.cpp261
-rw-r--r--zenserver/upstream/upstreamcache.h56
-rw-r--r--zenserver/upstream/zen.cpp18
-rw-r--r--zenserver/upstream/zen.h12
6 files changed, 215 insertions, 169 deletions
diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h
index 47fdc4e17..f90ad26ed 100644
--- a/zenserver/upstream/jupiter.h
+++ b/zenserver/upstream/jupiter.h
@@ -26,10 +26,10 @@ namespace detail {
struct CloudCacheSessionState;
}
-class IoBuffer;
+class CbObjectView;
class CloudCacheClient;
+class IoBuffer;
struct IoHash;
-class CbObjectView;
/**
* Cached access token, for use with `Authorization:` header
diff --git a/zenserver/upstream/upstreamapply.h b/zenserver/upstream/upstreamapply.h
index e48b67c61..c56a22ac3 100644
--- a/zenserver/upstream/upstreamapply.h
+++ b/zenserver/upstream/upstreamapply.h
@@ -115,26 +115,20 @@ struct UpstreamApplyEndpointStats
};
/**
- * The upstream apply endpont is responsible for handling remote execution.
+ * The upstream apply endpoint is responsible for handling remote execution.
*/
class UpstreamApplyEndpoint
{
public:
virtual ~UpstreamApplyEndpoint() = default;
- virtual UpstreamEndpointHealth Initialize() = 0;
-
- virtual bool IsHealthy() const = 0;
-
- virtual UpstreamEndpointHealth CheckHealth() = 0;
-
- virtual std::string_view DisplayName() const = 0;
-
- virtual PostUpstreamApplyResult PostApply(const UpstreamApplyRecord& ApplyRecord) = 0;
-
- virtual GetUpstreamApplyUpdatesResult GetUpdates() = 0;
-
- virtual UpstreamApplyEndpointStats& Stats() = 0;
+ virtual UpstreamEndpointHealth Initialize() = 0;
+ virtual bool IsHealthy() const = 0;
+ virtual UpstreamEndpointHealth CheckHealth() = 0;
+ virtual std::string_view DisplayName() const = 0;
+ virtual PostUpstreamApplyResult PostApply(const UpstreamApplyRecord& ApplyRecord) = 0;
+ virtual GetUpstreamApplyUpdatesResult GetUpdates() = 0;
+ virtual UpstreamApplyEndpointStats& Stats() = 0;
};
/**
@@ -145,8 +139,7 @@ class UpstreamApply
public:
virtual ~UpstreamApply() = default;
- virtual bool Initialize() = 0;
-
+ virtual bool Initialize() = 0;
virtual void RegisterEndpoint(std::unique_ptr<UpstreamApplyEndpoint> Endpoint) = 0;
struct EnqueueResult
@@ -161,11 +154,9 @@ public:
bool Success = false;
};
- virtual EnqueueResult EnqueueUpstream(UpstreamApplyRecord ApplyRecord) = 0;
-
- virtual StatusResult GetStatus(const IoHash& WorkerId, const IoHash& ActionId) = 0;
-
- virtual void GetStatus(CbObjectWriter& CbO) = 0;
+ virtual EnqueueResult EnqueueUpstream(UpstreamApplyRecord ApplyRecord) = 0;
+ virtual StatusResult GetStatus(const IoHash& WorkerId, const IoHash& ActionId) = 0;
+ virtual void GetStatus(CbObjectWriter& CbO) = 0;
};
std::unique_ptr<UpstreamApply> MakeUpstreamApply(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore);
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index d83542701..7466af1d2 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -18,6 +18,7 @@
#include <zenstore/cas.h>
#include <zenstore/cidstore.h>
+#include "cache/structuredcache.h"
#include <auth/authmgr.h>
#include "cache/structuredcachestore.h"
#include "diag/logging.h"
@@ -240,21 +241,16 @@ namespace detail {
}
}
- virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKey> CacheKeys,
- std::span<size_t> KeyIndex,
- const CacheRecordPolicy& Policy,
- OnCacheRecordGetComplete&& OnComplete) override
+ virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKeyRequest*> Requests, OnCacheRecordGetComplete&& OnComplete) override
{
ZEN_TRACE_CPU("Upstream::Horde::GetCacheRecords");
- ZEN_UNUSED(Policy);
-
CloudCacheSession Session(m_Client);
GetUpstreamCacheResult Result;
- for (size_t Index : KeyIndex)
+ for (CacheKeyRequest* Request : Requests)
{
- const CacheKey& CacheKey = CacheKeys[Index];
+ const CacheKey& CacheKey = Request->Key;
CbPackage Package;
CbObject Record;
@@ -289,7 +285,7 @@ namespace detail {
}
}
- OnComplete({.Key = CacheKey, .KeyIndex = Index, .Record = Record, .Package = Package});
+ OnComplete({.Request = *Request, .Record = Record, .Package = Package});
}
return Result;
@@ -326,20 +322,20 @@ namespace detail {
}
}
- virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests,
- std::span<size_t> RequestIndex,
- OnCacheValueGetComplete&& OnComplete) override final
+ virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest*> CacheChunkRequests,
+ OnCacheValueGetComplete&& OnComplete) override final
{
ZEN_TRACE_CPU("Upstream::Horde::GetCacheValues");
CloudCacheSession Session(m_Client);
GetUpstreamCacheResult Result;
- for (size_t Index : RequestIndex)
+ for (CacheChunkRequest* RequestPtr : CacheChunkRequests)
{
- const CacheChunkRequest& Request = CacheChunkRequests[Index];
- IoBuffer Payload;
+ CacheChunkRequest& Request = *RequestPtr;
+ IoBuffer Payload;
+ CompressedBuffer Compressed;
if (!Result.Error)
{
const CloudCacheResult BlobResult = Session.GetCompressedBlob(Request.ChunkId);
@@ -348,9 +344,23 @@ namespace detail {
AppendResult(BlobResult, Result);
m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason);
+ if (Payload && IsCompressedBinary(Payload.GetContentType()))
+ {
+ Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload));
+ }
}
- OnComplete({.Request = Request, .RequestIndex = Index, .Value = Payload});
+ if (Compressed)
+ {
+ OnComplete({.Request = Request,
+ .RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()),
+ .RawSize = Compressed.GetRawSize(),
+ .Value = Payload});
+ }
+ else
+ {
+ OnComplete({.Request = Request, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()});
+ }
}
return Result;
@@ -646,15 +656,10 @@ namespace detail {
}
}
- virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKey> CacheKeys,
- std::span<size_t> KeyIndex,
- const CacheRecordPolicy& Policy,
- OnCacheRecordGetComplete&& OnComplete) override
+ virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKeyRequest*> Requests, OnCacheRecordGetComplete&& OnComplete) override
{
ZEN_TRACE_CPU("Upstream::Zen::GetCacheRecords");
-
- std::vector<size_t> IndexMap;
- IndexMap.reserve(KeyIndex.size());
+ ZEN_ASSERT(Requests.size() > 0);
CbObjectWriter BatchRequest;
BatchRequest << "Method"sv
@@ -662,21 +667,30 @@ namespace detail {
BatchRequest.BeginObject("Params"sv);
{
- BatchRequest.BeginArray("CacheKeys"sv);
- for (size_t Index : KeyIndex)
- {
- const CacheKey& Key = CacheKeys[Index];
- IndexMap.push_back(Index);
+ CachePolicy DefaultPolicy = Requests[0]->Policy.GetRecordPolicy();
+ BatchRequest << "DefaultPolicy"sv << WriteToString<128>(DefaultPolicy);
+ BatchRequest.BeginArray("Requests"sv);
+ for (CacheKeyRequest* Request : Requests)
+ {
BatchRequest.BeginObject();
- BatchRequest << "Bucket"sv << Key.Bucket;
- BatchRequest << "Hash"sv << Key.Hash;
+ {
+ const CacheKey& Key = Request->Key;
+ BatchRequest.BeginObject("Key"sv);
+ {
+ BatchRequest << "Bucket"sv << Key.Bucket;
+ BatchRequest << "Hash"sv << Key.Hash;
+ }
+ BatchRequest.EndObject();
+ if (!Request->Policy.IsUniform() || Request->Policy.GetRecordPolicy() != DefaultPolicy)
+ {
+ BatchRequest.SetName("Policy"sv);
+ Request->Policy.Save(BatchRequest);
+ }
+ }
BatchRequest.EndObject();
}
BatchRequest.EndArray();
-
- BatchRequest.SetName("Policy"sv);
- Policy.Save(BatchRequest);
}
BatchRequest.EndObject();
@@ -694,19 +708,27 @@ namespace detail {
{
if (BatchResponse.TryLoad(Result.Response))
{
- for (size_t LocalIndex = 0; CbFieldView Record : BatchResponse.GetObject()["Result"sv])
+ CbArrayView Results = BatchResponse.GetObject()["Result"sv].AsArrayView();
+ if (Results.Num() != Requests.size())
{
- const size_t Index = IndexMap[LocalIndex++];
- OnComplete({.Key = CacheKeys[Index], .KeyIndex = Index, .Record = Record.AsObjectView(), .Package = BatchResponse});
+ ZEN_WARN("Upstream::Zen::GetCacheRecords invalid number of Requests from Upstream.");
}
+ else
+ {
+ for (size_t Index = 0; CbFieldView Record : Results)
+ {
+ CacheKeyRequest* Request = Requests[Index++];
+ OnComplete({.Request = *Request, .Record = Record.AsObjectView(), .Package = BatchResponse});
+ }
- return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true};
+ return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true};
+ }
}
}
- for (size_t Index : KeyIndex)
+ for (CacheKeyRequest* Request : Requests)
{
- OnComplete({.Key = CacheKeys[Index], .KeyIndex = Index, .Record = CbObjectView(), .Package = CbPackage()});
+ OnComplete({.Request = *Request, .Record = CbObjectView(), .Package = CbPackage()});
}
return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
@@ -743,27 +765,28 @@ namespace detail {
}
}
- virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests,
- std::span<size_t> RequestIndex,
- OnCacheValueGetComplete&& OnComplete) override final
+ virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest*> CacheChunkRequests,
+ OnCacheValueGetComplete&& OnComplete) override final
{
ZEN_TRACE_CPU("Upstream::Zen::GetCacheValues");
-
- std::vector<size_t> IndexMap;
- IndexMap.reserve(RequestIndex.size());
+ ZEN_ASSERT(!CacheChunkRequests.empty());
CbObjectWriter BatchRequest;
BatchRequest << "Method"sv
- << "GetCacheValues";
+ << "GetCacheChunks";
+#if BACKWARDS_COMPATABILITY_JAN2022
+ BatchRequest.AddInteger("MethodVersion"sv, 1);
+#endif
BatchRequest.BeginObject("Params"sv);
{
+ CachePolicy DefaultPolicy = CacheChunkRequests[0]->Policy;
+ BatchRequest << "DefaultPolicy"sv << WriteToString<128>(DefaultPolicy).ToView();
BatchRequest.BeginArray("ChunkRequests"sv);
{
- for (size_t Index : RequestIndex)
+ for (CacheChunkRequest* RequestPtr : CacheChunkRequests)
{
- const CacheChunkRequest& Request = CacheChunkRequests[Index];
- IndexMap.push_back(Index);
+ const CacheChunkRequest& Request = *RequestPtr;
BatchRequest.BeginObject();
{
@@ -771,11 +794,26 @@ namespace detail {
BatchRequest << "Bucket"sv << Request.Key.Bucket;
BatchRequest << "Hash"sv << Request.Key.Hash;
BatchRequest.EndObject();
- BatchRequest.AddObjectId("ValueId"sv, Request.ValueId);
- BatchRequest << "ChunkId"sv << Request.ChunkId;
- BatchRequest << "RawOffset"sv << Request.RawOffset;
- BatchRequest << "RawSize"sv << Request.RawSize;
- BatchRequest << "Policy"sv << WriteToString<128>(Request.Policy).ToView();
+ if (Request.ValueId)
+ {
+ BatchRequest.AddObjectId("ValueId"sv, Request.ValueId);
+ }
+ if (Request.ChunkId != Request.ChunkId.Zero)
+ {
+ BatchRequest << "ChunkId"sv << Request.ChunkId;
+ }
+ if (Request.RawOffset != 0)
+ {
+ BatchRequest << "RawOffset"sv << Request.RawOffset;
+ }
+ if (Request.RawSize != UINT64_MAX)
+ {
+ BatchRequest << "RawSize"sv << Request.RawSize;
+ }
+ if (Request.Policy != DefaultPolicy)
+ {
+ BatchRequest << "Policy"sv << WriteToString<128>(Request.Policy).ToView();
+ }
}
BatchRequest.EndObject();
}
@@ -798,29 +836,56 @@ namespace detail {
{
if (BatchResponse.TryLoad(Result.Response))
{
- for (size_t LocalIndex = 0; CbFieldView AttachmentHash : BatchResponse.GetObject()["Result"sv])
+ CbArrayView Results = BatchResponse.GetObject()["Result"sv].AsArrayView();
+ if (CacheChunkRequests.size() != Results.Num())
{
- const size_t Index = IndexMap[LocalIndex++];
- IoBuffer Payload;
-
- if (const CbAttachment* Attachment = BatchResponse.FindAttachment(AttachmentHash.AsHash()))
+ ZEN_WARN("Upstream::Zen::GetCacheValues invalid number of Requests from Upstream.");
+ }
+ else
+ {
+ for (size_t RequestIndex = 0; CbFieldView ChunkField : Results)
{
- if (const CompressedBuffer& Compressed = Attachment->AsCompressedBinary())
+ CacheChunkRequest& Request = *CacheChunkRequests[RequestIndex++];
+ CbObjectView ChunkObject = ChunkField.AsObjectView();
+ IoHash RawHash = ChunkObject["RawHash"sv].AsHash();
+ IoBuffer Payload;
+ uint64_t RawSize = 0;
+ if (RawHash != IoHash::Zero)
{
- Payload = Compressed.GetCompressed().Flatten().AsIoBuffer();
+ bool Success = false;
+ const CbAttachment* Attachment = BatchResponse.FindAttachment(RawHash);
+ if (Attachment)
+ {
+ if (const CompressedBuffer& Compressed = Attachment->AsCompressedBinary())
+ {
+ Payload = Compressed.GetCompressed().Flatten().AsIoBuffer();
+ Payload.SetContentType(ZenContentType::kCompressedBinary);
+ RawSize = Compressed.GetRawSize();
+ Success = true;
+ }
+ }
+ if (!Success)
+ {
+ CbFieldView RawSizeField = ChunkObject["RawSize"sv];
+ RawSize = RawSizeField.AsUInt64();
+ Success = !RawSizeField.HasError();
+ }
+ if (!Success)
+ {
+ RawHash = IoHash::Zero;
+ }
}
+ OnComplete({.Request = Request, .RawHash = RawHash, .RawSize = RawSize, .Value = std::move(Payload)});
}
- OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Value = std::move(Payload)});
+ return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true};
}
-
- return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true};
}
}
- for (size_t Index : RequestIndex)
+ for (CacheChunkRequest* RequestPtr : CacheChunkRequests)
{
- OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Value = IoBuffer()});
+ OnComplete({.Request = *RequestPtr, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()});
}
return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
@@ -1071,21 +1136,16 @@ public:
return {};
}
- virtual void GetCacheRecords(std::span<CacheKey> CacheKeys,
- std::span<size_t> KeyIndex,
- const CacheRecordPolicy& DownstreamPolicy,
- OnCacheRecordGetComplete&& OnComplete) override final
+ virtual void GetCacheRecords(std::span<CacheKeyRequest*> Requests, OnCacheRecordGetComplete&& OnComplete) override final
{
ZEN_TRACE_CPU("Upstream::GetCacheRecords");
std::shared_lock<std::shared_mutex> _(m_EndpointsMutex);
- std::vector<size_t> RemainingKeys(KeyIndex.begin(), KeyIndex.end());
+ std::vector<CacheKeyRequest*> RemainingKeys(Requests.begin(), Requests.end());
if (m_Options.ReadUpstream)
{
- CacheRecordPolicy UpstreamPolicy = DownstreamPolicy.ConvertToUpstream();
-
for (auto& Endpoint : m_Endpoints)
{
if (RemainingKeys.empty())
@@ -1098,25 +1158,24 @@ public:
continue;
}
- UpstreamEndpointStats& Stats = Endpoint->Stats();
- std::vector<size_t> Missing;
- GetUpstreamCacheResult Result;
+ UpstreamEndpointStats& Stats = Endpoint->Stats();
+ std::vector<CacheKeyRequest*> Missing;
+ GetUpstreamCacheResult Result;
{
metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming);
- Result =
- Endpoint->GetCacheRecords(CacheKeys, RemainingKeys, UpstreamPolicy, [&](CacheRecordGetCompleteParams&& Params) {
- if (Params.Record)
- {
- OnComplete(std::forward<CacheRecordGetCompleteParams>(Params));
+ Result = Endpoint->GetCacheRecords(RemainingKeys, [&](CacheRecordGetCompleteParams&& Params) {
+ if (Params.Record)
+ {
+ OnComplete(std::forward<CacheRecordGetCompleteParams>(Params));
- Stats.CacheHitCount.Increment(1);
- }
- else
- {
- Missing.push_back(Params.KeyIndex);
- }
- });
+ Stats.CacheHitCount.Increment(1);
+ }
+ else
+ {
+ Missing.push_back(&Params.Request);
+ }
+ });
}
Stats.CacheGetCount.Increment(int64_t(RemainingKeys.size()));
@@ -1136,21 +1195,19 @@ public:
}
}
- for (size_t Index : RemainingKeys)
+ for (CacheKeyRequest* Request : RemainingKeys)
{
- OnComplete({.Key = CacheKeys[Index], .KeyIndex = Index, .Record = CbObjectView(), .Package = CbPackage()});
+ OnComplete({.Request = *Request, .Record = CbObjectView(), .Package = CbPackage()});
}
}
- virtual void GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests,
- std::span<size_t> RequestIndex,
- OnCacheValueGetComplete&& OnComplete) override final
+ virtual void GetCacheValues(std::span<CacheChunkRequest*> CacheChunkRequests, OnCacheValueGetComplete&& OnComplete) override final
{
ZEN_TRACE_CPU("Upstream::GetCacheValues");
std::shared_lock<std::shared_mutex> _(m_EndpointsMutex);
- std::vector<size_t> RemainingKeys(RequestIndex.begin(), RequestIndex.end());
+ std::vector<CacheChunkRequest*> RemainingKeys(CacheChunkRequests.begin(), CacheChunkRequests.end());
if (m_Options.ReadUpstream)
{
@@ -1166,14 +1223,14 @@ public:
continue;
}
- UpstreamEndpointStats& Stats = Endpoint->Stats();
- std::vector<size_t> Missing;
- GetUpstreamCacheResult Result;
+ UpstreamEndpointStats& Stats = Endpoint->Stats();
+ std::vector<CacheChunkRequest*> Missing;
+ GetUpstreamCacheResult Result;
{
metrics::OperationTiming::Scope Scope(Endpoint->Stats().CacheGetRequestTiming);
- Result = Endpoint->GetCacheValues(CacheChunkRequests, RemainingKeys, [&](CacheValueGetCompleteParams&& Params) {
- if (Params.Value)
+ Result = Endpoint->GetCacheValues(RemainingKeys, [&](CacheValueGetCompleteParams&& Params) {
+ if (Params.RawHash != Params.RawHash.Zero)
{
OnComplete(std::forward<CacheValueGetCompleteParams>(Params));
@@ -1181,7 +1238,7 @@ public:
}
else
{
- Missing.push_back(Params.RequestIndex);
+ Missing.push_back(&Params.Request);
}
});
}
@@ -1203,9 +1260,9 @@ public:
}
}
- for (size_t Index : RemainingKeys)
+ for (CacheChunkRequest* RequestPtr : CacheChunkRequests)
{
- OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Value = IoBuffer()});
+ OnComplete({.Request = *RequestPtr, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()});
}
}
diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h
index 48601c879..4ccc56f79 100644
--- a/zenserver/upstream/upstreamcache.h
+++ b/zenserver/upstream/upstreamcache.h
@@ -2,6 +2,8 @@
#pragma once
+#include <zencore/compactbinary.h>
+#include <zencore/compress.h>
#include <zencore/iobuffer.h>
#include <zencore/iohash.h>
#include <zencore/stats.h>
@@ -12,9 +14,11 @@
#include <chrono>
#include <functional>
#include <memory>
+#include <vector>
namespace zen {
+class CbObjectView;
class AuthMgr;
class CbObjectView;
class CbPackage;
@@ -67,8 +71,7 @@ struct PutUpstreamCacheResult
struct CacheRecordGetCompleteParams
{
- const CacheKey& Key;
- size_t KeyIndex = ~size_t(0);
+ CacheKeyRequest& Request;
const CbObjectView& Record;
const CbPackage& Package;
};
@@ -77,9 +80,10 @@ using OnCacheRecordGetComplete = std::function<void(CacheRecordGetCompleteParams
struct CacheValueGetCompleteParams
{
- const CacheChunkRequest& Request;
- size_t RequestIndex{~size_t(0)};
- IoBuffer Value;
+ CacheChunkRequest& Request;
+ IoHash RawHash;
+ uint64_t RawSize;
+ IoBuffer Value;
};
using OnCacheValueGetComplete = std::function<void(CacheValueGetCompleteParams&&)>;
@@ -145,33 +149,26 @@ struct UpstreamEndpointInfo
};
/**
- * The upstream endpont is responsible for handling upload/downloading of cache records.
+ * The upstream endpoint is responsible for handling upload/downloading of cache records.
*/
class UpstreamEndpoint
{
public:
virtual ~UpstreamEndpoint() = default;
- virtual const UpstreamEndpointInfo& GetEndpointInfo() const = 0;
-
virtual UpstreamEndpointStatus Initialize() = 0;
- virtual UpstreamEndpointState GetState() = 0;
+ virtual const UpstreamEndpointInfo& GetEndpointInfo() const = 0;
+ virtual UpstreamEndpointState GetState() = 0;
virtual UpstreamEndpointStatus GetStatus() = 0;
- virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) = 0;
-
- virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKey> CacheKeys,
- std::span<size_t> KeyIndex,
- const CacheRecordPolicy& Policy,
- OnCacheRecordGetComplete&& OnComplete) = 0;
+ virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) = 0;
+ virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKeyRequest*> Requests, OnCacheRecordGetComplete&& OnComplete) = 0;
virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& PayloadId) = 0;
-
- virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests,
- std::span<size_t> RequestIndex,
- OnCacheValueGetComplete&& OnComplete) = 0;
+ virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest*> CacheChunkRequests,
+ OnCacheValueGetComplete&& OnComplete) = 0;
virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord,
IoBuffer RecordValue,
@@ -190,22 +187,14 @@ public:
virtual void Initialize() = 0;
- virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) = 0;
-
+ virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) = 0;
virtual void IterateEndpoints(std::function<bool(UpstreamEndpoint&)>&& Fn) = 0;
- virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) = 0;
-
- virtual void GetCacheRecords(std::span<CacheKey> CacheKeys,
- std::span<size_t> KeyIndex,
- const CacheRecordPolicy& RecordPolicy,
- OnCacheRecordGetComplete&& OnComplete) = 0;
+ virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) = 0;
+ virtual void GetCacheRecords(std::span<CacheKeyRequest*> Requests, OnCacheRecordGetComplete&& OnComplete) = 0;
- virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) = 0;
-
- virtual void GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests,
- std::span<size_t> RequestIndex,
- OnCacheValueGetComplete&& OnComplete) = 0;
+ virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) = 0;
+ virtual void GetCacheValues(std::span<CacheChunkRequest*> CacheChunkRequests, OnCacheValueGetComplete&& OnComplete) = 0;
virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0;
@@ -214,6 +203,9 @@ public:
std::unique_ptr<UpstreamCache> MakeUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore);
+std::unique_ptr<UpstreamEndpoint> MakeJupiterUpstreamEndpoint(const CloudCacheClientOptions& Options);
+
+std::unique_ptr<UpstreamEndpoint> MakeJupiterUpstreamEndpoint(const CloudCacheClientOptions& Options);
std::unique_ptr<UpstreamEndpoint> MakeJupiterUpstreamEndpoint(const CloudCacheClientOptions& Options,
const UpstreamAuthConfig& AuthConfig,
AuthMgr& Mgr);
diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp
index a2666ac02..0570dd316 100644
--- a/zenserver/upstream/zen.cpp
+++ b/zenserver/upstream/zen.cpp
@@ -66,17 +66,17 @@ namespace detail {
// Note that currently this just implements an UDP echo service for testing purposes
-Mesh::Mesh(asio::io_context& IoContext) : m_Log(logging::Get("mesh")), m_IoContext(IoContext), m_SessionId(GetSessionId())
+MeshTracker::MeshTracker(asio::io_context& IoContext) : m_Log(logging::Get("mesh")), m_IoContext(IoContext), m_SessionId(GetSessionId())
{
}
-Mesh::~Mesh()
+MeshTracker::~MeshTracker()
{
Stop();
}
void
-Mesh::Start(uint16_t Port)
+MeshTracker::Start(uint16_t Port)
{
ZEN_ASSERT(Port);
ZEN_ASSERT(m_Port == 0);
@@ -87,7 +87,7 @@ Mesh::Start(uint16_t Port)
};
void
-Mesh::Stop()
+MeshTracker::Stop()
{
using namespace std::literals;
@@ -118,7 +118,7 @@ Mesh::Stop()
}
void
-Mesh::EnqueueTick()
+MeshTracker::EnqueueTick()
{
m_Timer.expires_after(std::chrono::seconds(10));
@@ -138,7 +138,7 @@ Mesh::EnqueueTick()
}
void
-Mesh::OnTick()
+MeshTracker::OnTick()
{
using namespace std::literals;
@@ -156,7 +156,7 @@ Mesh::OnTick()
}
void
-Mesh::BroadcastPacket(CbObjectWriter& Obj)
+MeshTracker::BroadcastPacket(CbObjectWriter& Obj)
{
std::error_code ErrorCode;
@@ -201,7 +201,7 @@ Mesh::BroadcastPacket(CbObjectWriter& Obj)
}
void
-Mesh::Run()
+MeshTracker::Run()
{
m_State = kRunning;
@@ -212,7 +212,7 @@ Mesh::Run()
}
void
-Mesh::IssueReceive()
+MeshTracker::IssueReceive()
{
using namespace std::literals;
diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h
index 8cc4c121d..bc8fd3c56 100644
--- a/zenserver/upstream/zen.h
+++ b/zenserver/upstream/zen.h
@@ -34,12 +34,16 @@ class ZenStructuredCacheClient;
/** Zen mesh tracker
*
* Discovers and tracks local peers
+ *
+ * NOTE: This is currently experimental, and not very useful yet
+ *
*/
-class Mesh
+
+class MeshTracker
{
public:
- Mesh(asio::io_context& IoContext);
- ~Mesh();
+ MeshTracker(asio::io_context& IoContext);
+ ~MeshTracker();
void Start(uint16_t Port);
void Stop();
@@ -86,6 +90,8 @@ private:
tsl::robin_map<Oid, PeerInfo, Oid::Hasher> m_KnownPeers;
};
+//////////////////////////////////////////////////////////////////////////
+
namespace detail {
struct ZenCacheSessionState;
}