aboutsummaryrefslogtreecommitdiff
path: root/zenserver/upstream/upstreamcache.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'zenserver/upstream/upstreamcache.cpp')
-rw-r--r--zenserver/upstream/upstreamcache.cpp211
1 files changed, 125 insertions, 86 deletions
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index baed1b3b6..ac75285c7 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -15,6 +15,8 @@
#include <zencore/timer.h>
#include <zencore/trace.h>
+#include <zenhttp/httpshared.h>
+
#include <zenstore/cidstore.h>
#include <auth/authmgr.h>
@@ -180,7 +182,9 @@ namespace detail {
virtual UpstreamEndpointStatus GetStatus() override { return m_Status.EndpointStatus(); }
- virtual GetUpstreamCacheResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) override
+ virtual GetUpstreamCacheSingleResult GetCacheRecord(std::string_view Namespace,
+ const CacheKey& CacheKey,
+ ZenContentType Type) override
{
ZEN_TRACE_CPU("Upstream::Horde::GetSingleCacheRecord");
@@ -274,21 +278,20 @@ namespace detail {
if (Result.ErrorCode == 0)
{
- return {.Value = Result.Response,
- .Bytes = Result.Bytes,
- .ElapsedSeconds = Result.ElapsedSeconds,
- .Success = Result.Success};
+ return {.Status = {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success},
+ .Value = Result.Response,
+ .Source = &m_Info};
}
else
{
- return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
+ return {.Status = {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}};
}
}
catch (std::exception& Err)
{
m_Status.Set(UpstreamEndpointState::kError, Err.what());
- return {.Error{.ErrorCode = -1, .Reason = Err.what()}};
+ return {.Status = {.Error{.ErrorCode = -1, .Reason = Err.what()}}};
}
}
@@ -307,12 +310,14 @@ namespace detail {
CbPackage Package;
CbObject Record;
+ double ElapsedSeconds = 0.0;
if (!Result.Error)
{
std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace);
CloudCacheResult RefResult =
Session.GetRef(BlobStoreNamespace, CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject);
AppendResult(RefResult, Result);
+ ElapsedSeconds = RefResult.ElapsedSeconds;
m_Status.SetFromErrorCode(RefResult.ErrorCode, RefResult.Reason);
@@ -340,15 +345,18 @@ namespace detail {
}
}
- OnComplete({.Request = *Request, .Record = Record, .Package = Package});
+ OnComplete(
+ {.Request = *Request, .Record = Record, .Package = Package, .ElapsedSeconds = ElapsedSeconds, .Source = &m_Info});
}
return Result;
}
- virtual GetUpstreamCacheResult GetCacheValue(std::string_view Namespace, const CacheKey&, const IoHash& ValueContentId) override
+ virtual GetUpstreamCacheSingleResult GetCacheChunk(std::string_view Namespace,
+ const CacheKey&,
+ const IoHash& ValueContentId) override
{
- ZEN_TRACE_CPU("Upstream::Horde::GetSingleCacheValue");
+ ZEN_TRACE_CPU("Upstream::Horde::GetSingleCacheChunk");
try
{
@@ -360,21 +368,20 @@ namespace detail {
if (Result.ErrorCode == 0)
{
- return {.Value = Result.Response,
- .Bytes = Result.Bytes,
- .ElapsedSeconds = Result.ElapsedSeconds,
- .Success = Result.Success};
+ return {.Status = {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success},
+ .Value = Result.Response,
+ .Source = &m_Info};
}
else
{
- return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
+ return {.Status = {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}};
}
}
catch (std::exception& Err)
{
m_Status.Set(UpstreamEndpointState::kError, Err.what());
- return {.Error{.ErrorCode = -1, .Reason = Err.what()}};
+ return {.Status = {.Error{.ErrorCode = -1, .Reason = Err.what()}}};
}
}
@@ -392,6 +399,7 @@ namespace detail {
CacheChunkRequest& Request = *RequestPtr;
IoBuffer Payload;
+ double ElapsedSeconds = 0.0;
CompressedBuffer Compressed;
if (!Result.Error)
{
@@ -400,7 +408,8 @@ namespace detail {
Request.ChunkId == IoHash::Zero
? Session.GetInlineBlob(BlobStoreNamespace, Request.Key.Bucket, Request.Key.Hash, Request.ChunkId)
: Session.GetCompressedBlob(BlobStoreNamespace, Request.ChunkId);
- Payload = BlobResult.Response;
+ ElapsedSeconds = BlobResult.ElapsedSeconds;
+ Payload = BlobResult.Response;
AppendResult(BlobResult, Result);
@@ -413,10 +422,12 @@ namespace detail {
if (Compressed)
{
- OnComplete({.Request = Request,
- .RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()),
- .RawSize = Compressed.GetRawSize(),
- .Value = Payload});
+ OnComplete({.Request = Request,
+ .RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()),
+ .RawSize = Compressed.GetRawSize(),
+ .Value = Payload,
+ .ElapsedSeconds = ElapsedSeconds,
+ .Source = &m_Info});
}
else
{
@@ -441,6 +452,7 @@ namespace detail {
CacheValueRequest& Request = *RequestPtr;
IoBuffer Payload;
+ double ElapsedSeconds = 0.0;
CompressedBuffer Compressed;
if (!Result.Error)
{
@@ -448,7 +460,8 @@ namespace detail {
IoHash PayloadHash;
const CloudCacheResult BlobResult =
Session.GetInlineBlob(BlobStoreNamespace, Request.Key.Bucket, Request.Key.Hash, PayloadHash);
- Payload = BlobResult.Response;
+ ElapsedSeconds = BlobResult.ElapsedSeconds;
+ Payload = BlobResult.Response;
AppendResult(BlobResult, Result);
@@ -479,10 +492,12 @@ namespace detail {
if (Compressed)
{
- OnComplete({.Request = Request,
- .RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()),
- .RawSize = Compressed.GetRawSize(),
- .Value = Payload});
+ OnComplete({.Request = Request,
+ .RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()),
+ .RawSize = Compressed.GetRawSize(),
+ .Value = Payload,
+ .ElapsedSeconds = ElapsedSeconds,
+ .Source = &m_Info});
}
else
{
@@ -803,7 +818,9 @@ namespace detail {
virtual UpstreamEndpointStatus GetStatus() override { return m_Status.EndpointStatus(); }
- virtual GetUpstreamCacheResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) override
+ virtual GetUpstreamCacheSingleResult GetCacheRecord(std::string_view Namespace,
+ const CacheKey& CacheKey,
+ ZenContentType Type) override
{
ZEN_TRACE_CPU("Upstream::Zen::GetSingleCacheRecord");
@@ -816,21 +833,20 @@ namespace detail {
if (Result.ErrorCode == 0)
{
- return {.Value = Result.Response,
- .Bytes = Result.Bytes,
- .ElapsedSeconds = Result.ElapsedSeconds,
- .Success = Result.Success};
+ return {.Status = {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success},
+ .Value = Result.Response,
+ .Source = &m_Info};
}
else
{
- return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
+ return {.Status = {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}};
}
}
catch (std::exception& Err)
{
m_Status.Set(UpstreamEndpointState::kError, Err.what());
- return {.Error{.ErrorCode = -1, .Reason = Err.what()}};
+ return {.Status = {.Error{.ErrorCode = -1, .Reason = Err.what()}}};
}
}
@@ -876,7 +892,6 @@ namespace detail {
}
BatchRequest.EndObject();
- CbPackage BatchResponse;
ZenCacheResult Result;
{
@@ -888,24 +903,33 @@ namespace detail {
if (Result.Success)
{
- if (BatchResponse.TryLoad(Result.Response))
+ CbPackage BatchResponse;
+ if (ParsePackageMessageWithLegacyFallback(Result.Response, BatchResponse))
{
CbArrayView Results = BatchResponse.GetObject()["Result"sv].AsArrayView();
if (Results.Num() != Requests.size())
{
- ZEN_WARN("Upstream::Zen::GetCacheRecords invalid number of Requests from Upstream.");
+ ZEN_WARN("Upstream::Zen::GetCacheRecords invalid number of Response results from Upstream.");
}
else
{
for (size_t Index = 0; CbFieldView Record : Results)
{
CacheKeyRequest* Request = Requests[Index++];
- OnComplete({.Request = *Request, .Record = Record.AsObjectView(), .Package = BatchResponse});
+ OnComplete({.Request = *Request,
+ .Record = Record.AsObjectView(),
+ .Package = BatchResponse,
+ .ElapsedSeconds = Result.ElapsedSeconds,
+ .Source = &m_Info});
}
return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true};
}
}
+ else
+ {
+ ZEN_WARN("Upstream::Zen::GetCacheRecords invalid Response from Upstream.");
+ }
}
for (CacheKeyRequest* Request : Requests)
@@ -916,36 +940,35 @@ namespace detail {
return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
}
- virtual GetUpstreamCacheResult GetCacheValue(std::string_view Namespace,
- const CacheKey& CacheKey,
- const IoHash& ValueContentId) override
+ virtual GetUpstreamCacheSingleResult GetCacheChunk(std::string_view Namespace,
+ const CacheKey& CacheKey,
+ const IoHash& ValueContentId) override
{
- ZEN_TRACE_CPU("Upstream::Zen::GetCacheValue");
+ ZEN_TRACE_CPU("Upstream::Zen::GetCacheChunk");
try
{
ZenStructuredCacheSession Session(GetClientRef());
- const ZenCacheResult Result = Session.GetCacheValue(Namespace, CacheKey.Bucket, CacheKey.Hash, ValueContentId);
+ const ZenCacheResult Result = Session.GetCacheChunk(Namespace, CacheKey.Bucket, CacheKey.Hash, ValueContentId);
m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason);
if (Result.ErrorCode == 0)
{
- return {.Value = Result.Response,
- .Bytes = Result.Bytes,
- .ElapsedSeconds = Result.ElapsedSeconds,
- .Success = Result.Success};
+ return {.Status = {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success},
+ .Value = Result.Response,
+ .Source = &m_Info};
}
else
{
- return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
+ return {.Status = {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}};
}
}
catch (std::exception& Err)
{
m_Status.Set(UpstreamEndpointState::kError, Err.what());
- return {.Error{.ErrorCode = -1, .Reason = Err.what()}};
+ return {.Status = {.Error{.ErrorCode = -1, .Reason = Err.what()}}};
}
}
@@ -990,7 +1013,6 @@ namespace detail {
}
BatchRequest.EndObject();
- CbPackage BatchResponse;
ZenCacheResult Result;
{
@@ -1002,12 +1024,13 @@ namespace detail {
if (Result.Success)
{
- if (BatchResponse.TryLoad(Result.Response))
+ CbPackage BatchResponse;
+ if (ParsePackageMessageWithLegacyFallback(Result.Response, BatchResponse))
{
CbArrayView Results = BatchResponse.GetObject()["Result"sv].AsArrayView();
if (CacheValueRequests.size() != Results.Num())
{
- ZEN_WARN("Upstream::Zen::GetCacheValues invalid number of Requests from Upstream.");
+ ZEN_WARN("Upstream::Zen::GetCacheValues invalid number of Response results from Upstream.");
}
else
{
@@ -1043,12 +1066,21 @@ namespace detail {
RawHash = IoHash::Zero;
}
}
- OnComplete({.Request = Request, .RawHash = RawHash, .RawSize = RawSize, .Value = std::move(Payload)});
+ OnComplete({.Request = Request,
+ .RawHash = RawHash,
+ .RawSize = RawSize,
+ .Value = std::move(Payload),
+ .ElapsedSeconds = Result.ElapsedSeconds,
+ .Source = &m_Info});
}
return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true};
}
}
+ else
+ {
+ ZEN_WARN("Upstream::Zen::GetCacheValues invalid Response from Upstream.");
+ }
}
for (CacheValueRequest* RequestPtr : CacheValueRequests)
@@ -1116,7 +1148,6 @@ namespace detail {
}
BatchRequest.EndObject();
- CbPackage BatchResponse;
ZenCacheResult Result;
{
@@ -1128,12 +1159,13 @@ namespace detail {
if (Result.Success)
{
- if (BatchResponse.TryLoad(Result.Response))
+ CbPackage BatchResponse;
+ if (ParsePackageMessageWithLegacyFallback(Result.Response, BatchResponse))
{
CbArrayView Results = BatchResponse.GetObject()["Result"sv].AsArrayView();
if (CacheChunkRequests.size() != Results.Num())
{
- ZEN_WARN("Upstream::Zen::GetCacheChunks invalid number of Requests from Upstream.");
+ ZEN_WARN("Upstream::Zen::GetCacheChunks invalid number of Response results from Upstream.");
}
else
{
@@ -1169,12 +1201,21 @@ namespace detail {
RawHash = IoHash::Zero;
}
}
- OnComplete({.Request = Request, .RawHash = RawHash, .RawSize = RawSize, .Value = std::move(Payload)});
+ OnComplete({.Request = Request,
+ .RawHash = RawHash,
+ .RawSize = RawSize,
+ .Value = std::move(Payload),
+ .ElapsedSeconds = Result.ElapsedSeconds,
+ .Source = &m_Info});
}
return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true};
}
}
+ else
+ {
+ ZEN_WARN("Upstream::Zen::GetCacheChunks invalid Response from Upstream.");
+ }
}
for (CacheChunkRequest* RequestPtr : CacheChunkRequests)
@@ -1464,7 +1505,7 @@ public:
}
}
- virtual GetUpstreamCacheResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) override
+ virtual GetUpstreamCacheSingleResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) override
{
ZEN_TRACE_CPU("Upstream::GetCacheRecord");
@@ -1479,31 +1520,29 @@ public:
continue;
}
- UpstreamEndpointStats& Stats = Endpoint->Stats();
- GetUpstreamCacheResult Result;
- {
- metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming);
- Result = Endpoint->GetCacheRecord(Namespace, CacheKey, Type);
- }
+ UpstreamEndpointStats& Stats = Endpoint->Stats();
+ metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming);
+ GetUpstreamCacheSingleResult Result = Endpoint->GetCacheRecord(Namespace, CacheKey, Type);
+ Scope.Stop();
Stats.CacheGetCount.Increment(1);
- Stats.CacheGetTotalBytes.Increment(Result.Bytes);
+ Stats.CacheGetTotalBytes.Increment(Result.Status.Bytes);
- if (Result.Success)
+ if (Result.Status.Success)
{
Stats.CacheHitCount.Increment(1);
return Result;
}
- if (Result.Error)
+ if (Result.Status.Error)
{
Stats.CacheErrorCount.Increment(1);
ZEN_WARN("get cache record FAILED, endpoint '{}', reason '{}', error code '{}'",
Endpoint->GetEndpointInfo().Url,
- Result.Error.Reason,
- Result.Error.ErrorCode);
+ Result.Status.Error.Reason,
+ Result.Status.Error.ErrorCode);
}
}
}
@@ -1572,6 +1611,7 @@ public:
}
}
+ const UpstreamEndpointInfo Info;
for (CacheKeyRequest* Request : RemainingKeys)
{
OnComplete({.Request = *Request, .Record = CbObjectView(), .Package = CbPackage()});
@@ -1629,7 +1669,7 @@ public:
{
Stats.CacheErrorCount.Increment(1);
- ZEN_WARN("get cache values(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'",
+ ZEN_WARN("get cache chunks(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'",
Endpoint->GetEndpointInfo().Url,
Result.Error.Reason,
Result.Error.ErrorCode);
@@ -1639,17 +1679,18 @@ public:
}
}
+ const UpstreamEndpointInfo Info;
for (CacheChunkRequest* RequestPtr : RemainingKeys)
{
OnComplete({.Request = *RequestPtr, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()});
}
}
- virtual GetUpstreamCacheResult GetCacheValue(std::string_view Namespace,
- const CacheKey& CacheKey,
- const IoHash& ValueContentId) override
+ virtual GetUpstreamCacheSingleResult GetCacheChunk(std::string_view Namespace,
+ const CacheKey& CacheKey,
+ const IoHash& ValueContentId) override
{
- ZEN_TRACE_CPU("Upstream::GetCacheValue");
+ ZEN_TRACE_CPU("Upstream::GetCacheChunk");
if (m_Options.ReadUpstream)
{
@@ -1660,32 +1701,29 @@ public:
continue;
}
- UpstreamEndpointStats& Stats = Endpoint->Stats();
- GetUpstreamCacheResult Result;
-
- {
- metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming);
- Result = Endpoint->GetCacheValue(Namespace, CacheKey, ValueContentId);
- }
+ UpstreamEndpointStats& Stats = Endpoint->Stats();
+ metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming);
+ GetUpstreamCacheSingleResult Result = Endpoint->GetCacheChunk(Namespace, CacheKey, ValueContentId);
+ Scope.Stop();
Stats.CacheGetCount.Increment(1);
- Stats.CacheGetTotalBytes.Increment(Result.Bytes);
+ Stats.CacheGetTotalBytes.Increment(Result.Status.Bytes);
- if (Result.Success)
+ if (Result.Status.Success)
{
Stats.CacheHitCount.Increment(1);
return Result;
}
- if (Result.Error)
+ if (Result.Status.Error)
{
Stats.CacheErrorCount.Increment(1);
- ZEN_WARN("get cache value FAILED, endpoint '{}', reason '{}', error code '{}'",
+ ZEN_WARN("get cache chunk FAILED, endpoint '{}', reason '{}', error code '{}'",
Endpoint->GetEndpointInfo().Url,
- Result.Error.Reason,
- Result.Error.ErrorCode);
+ Result.Status.Error.Reason,
+ Result.Status.Error.ErrorCode);
}
}
}
@@ -1754,6 +1792,7 @@ public:
}
}
+ const UpstreamEndpointInfo Info;
for (CacheValueRequest* RequestPtr : RemainingKeys)
{
OnComplete({.Request = *RequestPtr, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()});