aboutsummaryrefslogtreecommitdiff
path: root/zenserver/upstream
diff options
context:
space:
mode:
Diffstat (limited to 'zenserver/upstream')
-rw-r--r--zenserver/upstream/upstreamcache.cpp132
-rw-r--r--zenserver/upstream/upstreamcache.h85
2 files changed, 117 insertions, 100 deletions
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index 1aa195469..c1c0395e7 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -180,7 +180,9 @@ namespace detail {
virtual UpstreamEndpointStatus GetStatus() override { return m_Status.EndpointStatus(); }
- virtual GetUpstreamCacheResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) override
+ virtual GetUpstreamCacheSingleResult GetCacheRecord(std::string_view Namespace,
+ const CacheKey& CacheKey,
+ ZenContentType Type) override
{
ZEN_TRACE_CPU("Upstream::Horde::GetSingleCacheRecord");
@@ -274,21 +276,20 @@ namespace detail {
if (Result.ErrorCode == 0)
{
- return {.Value = Result.Response,
- .Bytes = Result.Bytes,
- .ElapsedSeconds = Result.ElapsedSeconds,
- .Success = Result.Success};
+ return {.Status = {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success},
+ .Value = Result.Response,
+ .Source = &m_Info};
}
else
{
- return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
+ return {.Status = {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}};
}
}
catch (std::exception& Err)
{
m_Status.Set(UpstreamEndpointState::kError, Err.what());
- return {.Error{.ErrorCode = -1, .Reason = Err.what()}};
+ return {.Status = {.Error{.ErrorCode = -1, .Reason = Err.what()}}};
}
}
@@ -342,13 +343,16 @@ namespace detail {
}
}
- OnComplete({.Request = *Request, .Record = Record, .Package = Package, .ElapsedSeconds = ElapsedSeconds});
+ OnComplete(
+ {.Request = *Request, .Record = Record, .Package = Package, .ElapsedSeconds = ElapsedSeconds, .Source = &m_Info});
}
return Result;
}
- virtual GetUpstreamCacheResult GetCacheChunk(std::string_view Namespace, const CacheKey&, const IoHash& ValueContentId) override
+ virtual GetUpstreamCacheSingleResult GetCacheChunk(std::string_view Namespace,
+ const CacheKey&,
+ const IoHash& ValueContentId) override
{
ZEN_TRACE_CPU("Upstream::Horde::GetSingleCacheChunk");
@@ -362,21 +366,20 @@ namespace detail {
if (Result.ErrorCode == 0)
{
- return {.Value = Result.Response,
- .Bytes = Result.Bytes,
- .ElapsedSeconds = Result.ElapsedSeconds,
- .Success = Result.Success};
+ return {.Status = {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success},
+ .Value = Result.Response,
+ .Source = &m_Info};
}
else
{
- return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
+ return {.Status = {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}};
}
}
catch (std::exception& Err)
{
m_Status.Set(UpstreamEndpointState::kError, Err.what());
- return {.Error{.ErrorCode = -1, .Reason = Err.what()}};
+ return {.Status = {.Error{.ErrorCode = -1, .Reason = Err.what()}}};
}
}
@@ -418,7 +421,8 @@ namespace detail {
.RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()),
.RawSize = Compressed.GetRawSize(),
.Value = Payload,
- .ElapsedSeconds = ElapsedSeconds});
+ .ElapsedSeconds = ElapsedSeconds,
+ .Source = &m_Info});
}
else
{
@@ -487,7 +491,8 @@ namespace detail {
.RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()),
.RawSize = Compressed.GetRawSize(),
.Value = Payload,
- .ElapsedSeconds = ElapsedSeconds});
+ .ElapsedSeconds = ElapsedSeconds,
+ .Source = &m_Info});
}
else
{
@@ -808,7 +813,9 @@ namespace detail {
virtual UpstreamEndpointStatus GetStatus() override { return m_Status.EndpointStatus(); }
- virtual GetUpstreamCacheResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) override
+ virtual GetUpstreamCacheSingleResult GetCacheRecord(std::string_view Namespace,
+ const CacheKey& CacheKey,
+ ZenContentType Type) override
{
ZEN_TRACE_CPU("Upstream::Zen::GetSingleCacheRecord");
@@ -821,21 +828,20 @@ namespace detail {
if (Result.ErrorCode == 0)
{
- return {.Value = Result.Response,
- .Bytes = Result.Bytes,
- .ElapsedSeconds = Result.ElapsedSeconds,
- .Success = Result.Success};
+ return {.Status = {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success},
+ .Value = Result.Response,
+ .Source = &m_Info};
}
else
{
- return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
+ return {.Status = {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}};
}
}
catch (std::exception& Err)
{
m_Status.Set(UpstreamEndpointState::kError, Err.what());
- return {.Error{.ErrorCode = -1, .Reason = Err.what()}};
+ return {.Status = {.Error{.ErrorCode = -1, .Reason = Err.what()}}};
}
}
@@ -908,7 +914,8 @@ namespace detail {
OnComplete({.Request = *Request,
.Record = Record.AsObjectView(),
.Package = BatchResponse,
- .ElapsedSeconds = Result.ElapsedSeconds});
+ .ElapsedSeconds = Result.ElapsedSeconds,
+ .Source = &m_Info});
}
return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true};
@@ -924,9 +931,9 @@ namespace detail {
return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
}
- virtual GetUpstreamCacheResult GetCacheChunk(std::string_view Namespace,
- const CacheKey& CacheKey,
- const IoHash& ValueContentId) override
+ virtual GetUpstreamCacheSingleResult GetCacheChunk(std::string_view Namespace,
+ const CacheKey& CacheKey,
+ const IoHash& ValueContentId) override
{
ZEN_TRACE_CPU("Upstream::Zen::GetCacheChunk");
@@ -939,21 +946,20 @@ namespace detail {
if (Result.ErrorCode == 0)
{
- return {.Value = Result.Response,
- .Bytes = Result.Bytes,
- .ElapsedSeconds = Result.ElapsedSeconds,
- .Success = Result.Success};
+ return {.Status = {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success},
+ .Value = Result.Response,
+ .Source = &m_Info};
}
else
{
- return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
+ return {.Status = {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}};
}
}
catch (std::exception& Err)
{
m_Status.Set(UpstreamEndpointState::kError, Err.what());
- return {.Error{.ErrorCode = -1, .Reason = Err.what()}};
+ return {.Status = {.Error{.ErrorCode = -1, .Reason = Err.what()}}};
}
}
@@ -1055,7 +1061,8 @@ namespace detail {
.RawHash = RawHash,
.RawSize = RawSize,
.Value = std::move(Payload),
- .ElapsedSeconds = Result.ElapsedSeconds});
+ .ElapsedSeconds = Result.ElapsedSeconds,
+ .Source = &m_Info});
}
return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true};
@@ -1185,7 +1192,8 @@ namespace detail {
.RawHash = RawHash,
.RawSize = RawSize,
.Value = std::move(Payload),
- .ElapsedSeconds = Result.ElapsedSeconds});
+ .ElapsedSeconds = Result.ElapsedSeconds,
+ .Source = &m_Info});
}
return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true};
@@ -1480,7 +1488,7 @@ public:
}
}
- virtual GetUpstreamCacheResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) override
+ virtual GetUpstreamCacheSingleResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) override
{
ZEN_TRACE_CPU("Upstream::GetCacheRecord");
@@ -1495,31 +1503,29 @@ public:
continue;
}
- UpstreamEndpointStats& Stats = Endpoint->Stats();
- GetUpstreamCacheResult Result;
- {
- metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming);
- Result = Endpoint->GetCacheRecord(Namespace, CacheKey, Type);
- }
+ UpstreamEndpointStats& Stats = Endpoint->Stats();
+ metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming);
+ GetUpstreamCacheSingleResult Result = Endpoint->GetCacheRecord(Namespace, CacheKey, Type);
+ Scope.Stop();
Stats.CacheGetCount.Increment(1);
- Stats.CacheGetTotalBytes.Increment(Result.Bytes);
+ Stats.CacheGetTotalBytes.Increment(Result.Status.Bytes);
- if (Result.Success)
+ if (Result.Status.Success)
{
Stats.CacheHitCount.Increment(1);
return Result;
}
- if (Result.Error)
+ if (Result.Status.Error)
{
Stats.CacheErrorCount.Increment(1);
ZEN_WARN("get cache record FAILED, endpoint '{}', reason '{}', error code '{}'",
Endpoint->GetEndpointInfo().Url,
- Result.Error.Reason,
- Result.Error.ErrorCode);
+ Result.Status.Error.Reason,
+ Result.Status.Error.ErrorCode);
}
}
}
@@ -1588,6 +1594,7 @@ public:
}
}
+ const UpstreamEndpointInfo Info;
for (CacheKeyRequest* Request : RemainingKeys)
{
OnComplete({.Request = *Request, .Record = CbObjectView(), .Package = CbPackage()});
@@ -1655,15 +1662,16 @@ public:
}
}
+ const UpstreamEndpointInfo Info;
for (CacheChunkRequest* RequestPtr : RemainingKeys)
{
OnComplete({.Request = *RequestPtr, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()});
}
}
- virtual GetUpstreamCacheResult GetCacheChunk(std::string_view Namespace,
- const CacheKey& CacheKey,
- const IoHash& ValueContentId) override
+ virtual GetUpstreamCacheSingleResult GetCacheChunk(std::string_view Namespace,
+ const CacheKey& CacheKey,
+ const IoHash& ValueContentId) override
{
ZEN_TRACE_CPU("Upstream::GetCacheChunk");
@@ -1676,32 +1684,29 @@ public:
continue;
}
- UpstreamEndpointStats& Stats = Endpoint->Stats();
- GetUpstreamCacheResult Result;
-
- {
- metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming);
- Result = Endpoint->GetCacheChunk(Namespace, CacheKey, ValueContentId);
- }
+ UpstreamEndpointStats& Stats = Endpoint->Stats();
+ metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming);
+ GetUpstreamCacheSingleResult Result = Endpoint->GetCacheChunk(Namespace, CacheKey, ValueContentId);
+ Scope.Stop();
Stats.CacheGetCount.Increment(1);
- Stats.CacheGetTotalBytes.Increment(Result.Bytes);
+ Stats.CacheGetTotalBytes.Increment(Result.Status.Bytes);
- if (Result.Success)
+ if (Result.Status.Success)
{
Stats.CacheHitCount.Increment(1);
return Result;
}
- if (Result.Error)
+ if (Result.Status.Error)
{
Stats.CacheErrorCount.Increment(1);
ZEN_WARN("get cache chunk FAILED, endpoint '{}', reason '{}', error code '{}'",
Endpoint->GetEndpointInfo().Url,
- Result.Error.Reason,
- Result.Error.ErrorCode);
+ Result.Status.Error.Reason,
+ Result.Status.Error.ErrorCode);
}
}
}
@@ -1770,6 +1775,7 @@ public:
}
}
+ const UpstreamEndpointInfo Info;
for (CacheValueRequest* RequestPtr : RemainingKeys)
{
OnComplete({.Request = *RequestPtr, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()});
diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h
index 108c097da..695c06b32 100644
--- a/zenserver/upstream/upstreamcache.h
+++ b/zenserver/upstream/upstreamcache.h
@@ -53,15 +53,27 @@ struct UpstreamError
explicit operator bool() const { return ErrorCode != 0; }
};
+struct UpstreamEndpointInfo
+{
+ std::string Name;
+ std::string Url;
+};
+
struct GetUpstreamCacheResult
{
- IoBuffer Value;
UpstreamError Error{};
int64_t Bytes{};
double ElapsedSeconds{};
bool Success = false;
};
+struct GetUpstreamCacheSingleResult
+{
+ GetUpstreamCacheResult Status;
+ IoBuffer Value;
+ const UpstreamEndpointInfo* Source = nullptr;
+};
+
struct PutUpstreamCacheResult
{
std::string Reason;
@@ -72,32 +84,35 @@ struct PutUpstreamCacheResult
struct CacheRecordGetCompleteParams
{
- CacheKeyRequest& Request;
- const CbObjectView& Record;
- const CbPackage& Package;
- double ElapsedSeconds{};
+ CacheKeyRequest& Request;
+ const CbObjectView& Record;
+ const CbPackage& Package;
+ double ElapsedSeconds{};
+ const UpstreamEndpointInfo* Source = nullptr;
};
using OnCacheRecordGetComplete = std::function<void(CacheRecordGetCompleteParams&&)>;
struct CacheValueGetCompleteParams
{
- CacheValueRequest& Request;
- IoHash RawHash;
- uint64_t RawSize;
- IoBuffer Value;
- double ElapsedSeconds{};
+ CacheValueRequest& Request;
+ IoHash RawHash;
+ uint64_t RawSize;
+ IoBuffer Value;
+ double ElapsedSeconds{};
+ const UpstreamEndpointInfo* Source = nullptr;
};
using OnCacheValueGetComplete = std::function<void(CacheValueGetCompleteParams&&)>;
struct CacheChunkGetCompleteParams
{
- CacheChunkRequest& Request;
- IoHash RawHash;
- uint64_t RawSize;
- IoBuffer Value;
- double ElapsedSeconds{};
+ CacheChunkRequest& Request;
+ IoHash RawHash;
+ uint64_t RawSize;
+ IoBuffer Value;
+ double ElapsedSeconds{};
+ const UpstreamEndpointInfo* Source = nullptr;
};
using OnCacheChunksGetComplete = std::function<void(CacheChunkGetCompleteParams&&)>;
@@ -156,12 +171,6 @@ struct UpstreamEndpointStatus
UpstreamEndpointState State;
};
-struct UpstreamEndpointInfo
-{
- std::string Name;
- std::string Url;
-};
-
/**
* The upstream endpoint is responsible for handling upload/downloading of cache records.
*/
@@ -177,19 +186,19 @@ public:
virtual UpstreamEndpointState GetState() = 0;
virtual UpstreamEndpointStatus GetStatus() = 0;
- virtual GetUpstreamCacheResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) = 0;
- virtual GetUpstreamCacheResult GetCacheRecords(std::string_view Namespace,
- std::span<CacheKeyRequest*> Requests,
- OnCacheRecordGetComplete&& OnComplete) = 0;
+ virtual GetUpstreamCacheSingleResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) = 0;
+ virtual GetUpstreamCacheResult GetCacheRecords(std::string_view Namespace,
+ std::span<CacheKeyRequest*> Requests,
+ OnCacheRecordGetComplete&& OnComplete) = 0;
virtual GetUpstreamCacheResult GetCacheValues(std::string_view Namespace,
std::span<CacheValueRequest*> CacheValueRequests,
OnCacheValueGetComplete&& OnComplete) = 0;
- virtual GetUpstreamCacheResult GetCacheChunk(std::string_view Namespace, const CacheKey& CacheKey, const IoHash& PayloadId) = 0;
- virtual GetUpstreamCacheResult GetCacheChunks(std::string_view Namespace,
- std::span<CacheChunkRequest*> CacheChunkRequests,
- OnCacheChunksGetComplete&& OnComplete) = 0;
+ virtual GetUpstreamCacheSingleResult GetCacheChunk(std::string_view Namespace, const CacheKey& CacheKey, const IoHash& PayloadId) = 0;
+ virtual GetUpstreamCacheResult GetCacheChunks(std::string_view Namespace,
+ std::span<CacheChunkRequest*> CacheChunkRequests,
+ OnCacheChunksGetComplete&& OnComplete) = 0;
virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord,
IoBuffer RecordValue,
@@ -217,19 +226,21 @@ public:
virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) = 0;
virtual void IterateEndpoints(std::function<bool(UpstreamEndpoint&)>&& Fn) = 0;
- virtual GetUpstreamCacheResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) = 0;
- virtual void GetCacheRecords(std::string_view Namespace,
- std::span<CacheKeyRequest*> Requests,
- OnCacheRecordGetComplete&& OnComplete) = 0;
+ virtual GetUpstreamCacheSingleResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) = 0;
+ virtual void GetCacheRecords(std::string_view Namespace,
+ std::span<CacheKeyRequest*> Requests,
+ OnCacheRecordGetComplete&& OnComplete) = 0;
virtual void GetCacheValues(std::string_view Namespace,
std::span<CacheValueRequest*> CacheValueRequests,
OnCacheValueGetComplete&& OnComplete) = 0;
- virtual GetUpstreamCacheResult GetCacheChunk(std::string_view Namespace, const CacheKey& CacheKey, const IoHash& ValueContentId) = 0;
- virtual void GetCacheChunks(std::string_view Namespace,
- std::span<CacheChunkRequest*> CacheChunkRequests,
- OnCacheChunksGetComplete&& OnComplete) = 0;
+ virtual GetUpstreamCacheSingleResult GetCacheChunk(std::string_view Namespace,
+ const CacheKey& CacheKey,
+ const IoHash& ValueContentId) = 0;
+ virtual void GetCacheChunks(std::string_view Namespace,
+ std::span<CacheChunkRequest*> CacheChunkRequests,
+ OnCacheChunksGetComplete&& OnComplete) = 0;
virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0;