aboutsummaryrefslogtreecommitdiff
path: root/zenserver/upstream/upstreamcache.cpp
diff options
context:
space:
mode:
authorPer Larsson <[email protected]>2022-01-28 13:07:36 +0100
committerPer Larsson <[email protected]>2022-01-28 13:07:36 +0100
commitbd43839e042425d72b584b33c7dbb86dabc95e12 (patch)
tree1e663395ac626f3863ef92e95952b3c4245abf76 /zenserver/upstream/upstreamcache.cpp
parentGet access token from auth mgr. (diff)
parentCompile fix (diff)
downloadzen-bd43839e042425d72b584b33c7dbb86dabc95e12.tar.xz
zen-bd43839e042425d72b584b33c7dbb86dabc95e12.zip
Merged main.
Diffstat (limited to 'zenserver/upstream/upstreamcache.cpp')
-rw-r--r--zenserver/upstream/upstreamcache.cpp156
1 files changed, 80 insertions, 76 deletions
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index 68e7edfab..206787bf7 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -272,14 +272,14 @@ namespace detail {
return Result;
}
- virtual GetUpstreamCacheResult GetCachePayload(const CacheKey&, const IoHash& PayloadId) override
+ virtual GetUpstreamCacheResult GetCacheValue(const CacheKey&, const IoHash& ValueContentId) override
{
- ZEN_TRACE_CPU("Upstream::Horde::GetSingleCachePayload");
+ ZEN_TRACE_CPU("Upstream::Horde::GetSingleCacheValue");
try
{
CloudCacheSession Session(m_Client);
- const CloudCacheResult Result = Session.GetCompressedBlob(PayloadId);
+ const CloudCacheResult Result = Session.GetCompressedBlob(ValueContentId);
m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason);
@@ -303,11 +303,11 @@ namespace detail {
}
}
- virtual GetUpstreamCacheResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests,
- std::span<size_t> RequestIndex,
- OnCachePayloadGetComplete&& OnComplete) override final
+ virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests,
+ std::span<size_t> RequestIndex,
+ OnCacheValueGetComplete&& OnComplete) override final
{
- ZEN_TRACE_CPU("Upstream::Horde::GetCachePayloads");
+ ZEN_TRACE_CPU("Upstream::Horde::GetCacheValues");
CloudCacheSession Session(m_Client);
GetUpstreamCacheResult Result;
@@ -327,7 +327,7 @@ namespace detail {
m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason);
}
- OnComplete({.Request = Request, .RequestIndex = Index, .Payload = Payload});
+ OnComplete({.Request = Request, .RequestIndex = Index, .Value = Payload});
}
return Result;
@@ -335,11 +335,11 @@ namespace detail {
virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord,
IoBuffer RecordValue,
- std::span<IoBuffer const> Payloads) override
+ std::span<IoBuffer const> Values) override
{
ZEN_TRACE_CPU("Upstream::Horde::PutCacheRecord");
- ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size());
+ ZEN_ASSERT(CacheRecord.ValueContentIds.size() == Values.size());
const int32_t MaxAttempts = 3;
try
@@ -373,30 +373,31 @@ namespace detail {
int64_t TotalBytes = 0ull;
double TotalElapsedSeconds = 0.0;
- const auto PutBlobs = [&](std::span<IoHash> PayloadIds, std::string& OutReason) -> bool {
- for (const IoHash& PayloadId : PayloadIds)
+ const auto PutBlobs = [&](std::span<IoHash> ValueContentIds, std::string& OutReason) -> bool {
+ for (const IoHash& ValueContentId : ValueContentIds)
{
- const auto It = std::find(std::begin(CacheRecord.PayloadIds), std::end(CacheRecord.PayloadIds), PayloadId);
+ const auto It =
+ std::find(std::begin(CacheRecord.ValueContentIds), std::end(CacheRecord.ValueContentIds), ValueContentId);
- if (It == std::end(CacheRecord.PayloadIds))
+ if (It == std::end(CacheRecord.ValueContentIds))
{
- OutReason = fmt::format("payload '{}' MISSING from local cache", PayloadId);
+ OutReason = fmt::format("value '{}' MISSING from local cache", ValueContentId);
return false;
}
- const size_t Idx = std::distance(std::begin(CacheRecord.PayloadIds), It);
+ const size_t Idx = std::distance(std::begin(CacheRecord.ValueContentIds), It);
CloudCacheResult BlobResult;
for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++)
{
- BlobResult = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]);
+ BlobResult = Session.PutCompressedBlob(CacheRecord.ValueContentIds[Idx], Values[Idx]);
}
m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason);
if (!BlobResult.Success)
{
- OutReason = fmt::format("upload payload '{}' FAILED, reason '{}'", PayloadId, BlobResult.Reason);
+ OutReason = fmt::format("upload value '{}' FAILED, reason '{}'", ValueContentId, BlobResult.Reason);
return false;
}
@@ -475,7 +476,7 @@ namespace detail {
Sb << MissingHash.ToHexString() << ",";
}
- return {.Reason = fmt::format("finalize '{}/{}' FAILED, still needs payload(s) '{}'",
+ return {.Reason = fmt::format("finalize '{}/{}' FAILED, still needs value(s) '{}'",
CacheRecord.Key.Bucket,
CacheRecord.Key.Hash,
Sb.ToString()),
@@ -538,10 +539,12 @@ namespace detail {
public:
ZenUpstreamEndpoint(const ZenStructuredCacheClientOptions& Options)
: m_Log(zen::logging::Get("upstream"))
- , m_Info({.Name = std::string(Options.Name)})
, m_ConnectTimeout(Options.ConnectTimeout)
, m_Timeout(Options.Timeout)
{
+ ZEN_ASSERT(!Options.Name.empty());
+ m_Info.Name = Options.Name;
+
for (const auto& Url : Options.Urls)
{
m_Endpoints.push_back({.Url = Url});
@@ -649,9 +652,8 @@ namespace detail {
}
BatchRequest.EndArray();
- BatchRequest.BeginObject("Policy"sv);
- CacheRecordPolicy::Save(Policy, BatchRequest);
- BatchRequest.EndObject();
+ BatchRequest.SetName("Policy"sv);
+ Policy.Save(BatchRequest);
}
BatchRequest.EndObject();
@@ -687,14 +689,14 @@ namespace detail {
return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
}
- virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) override
+ virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) override
{
- ZEN_TRACE_CPU("Upstream::Zen::GetSingleCachePayload");
+ ZEN_TRACE_CPU("Upstream::Zen::GetSingleCacheValue");
try
{
ZenStructuredCacheSession Session(*m_Client);
- const ZenCacheResult Result = Session.GetCachePayload(CacheKey.Bucket, CacheKey.Hash, PayloadId);
+ const ZenCacheResult Result = Session.GetCacheValue(CacheKey.Bucket, CacheKey.Hash, ValueContentId);
m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason);
@@ -718,18 +720,18 @@ namespace detail {
}
}
- virtual GetUpstreamCacheResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests,
- std::span<size_t> RequestIndex,
- OnCachePayloadGetComplete&& OnComplete) override final
+ virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests,
+ std::span<size_t> RequestIndex,
+ OnCacheValueGetComplete&& OnComplete) override final
{
- ZEN_TRACE_CPU("Upstream::Zen::GetCachePayloads");
+ ZEN_TRACE_CPU("Upstream::Zen::GetCacheValues");
std::vector<size_t> IndexMap;
IndexMap.reserve(RequestIndex.size());
CbObjectWriter BatchRequest;
BatchRequest << "Method"sv
- << "GetCachePayloads";
+ << "GetCacheValues";
BatchRequest.BeginObject("Params"sv);
{
@@ -746,12 +748,11 @@ namespace detail {
BatchRequest << "Bucket"sv << Request.Key.Bucket;
BatchRequest << "Hash"sv << Request.Key.Hash;
BatchRequest.EndObject();
-
- BatchRequest.AddObjectId("PayloadId"sv, Request.PayloadId);
+ BatchRequest.AddObjectId("ValueId"sv, Request.ValueId);
BatchRequest << "ChunkId"sv << Request.ChunkId;
BatchRequest << "RawOffset"sv << Request.RawOffset;
BatchRequest << "RawSize"sv << Request.RawSize;
- BatchRequest << "Policy"sv << static_cast<uint32_t>(Request.Policy);
+ BatchRequest << "Policy"sv << WriteToString<128>(Request.Policy).ToView();
}
BatchRequest.EndObject();
}
@@ -787,7 +788,7 @@ namespace detail {
}
}
- OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = std::move(Payload)});
+ OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Value = std::move(Payload)});
}
return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true};
@@ -796,7 +797,7 @@ namespace detail {
for (size_t Index : RequestIndex)
{
- OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = IoBuffer()});
+ OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Value = IoBuffer()});
}
return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
@@ -804,11 +805,11 @@ namespace detail {
virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord,
IoBuffer RecordValue,
- std::span<IoBuffer const> Payloads) override
+ std::span<IoBuffer const> Values) override
{
ZEN_TRACE_CPU("Upstream::Zen::PutCacheRecord");
- ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size());
+ ZEN_ASSERT(CacheRecord.ValueContentIds.size() == Values.size());
const int32_t MaxAttempts = 3;
try
@@ -823,15 +824,15 @@ namespace detail {
CbPackage Package;
Package.SetObject(CbObject(SharedBuffer(RecordValue)));
- for (const IoBuffer& Payload : Payloads)
+ for (const IoBuffer& Value : Values)
{
- if (CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(Payload)))
+ if (CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(Value)))
{
Package.AddAttachment(CbAttachment(AttachmentBuffer));
}
else
{
- return {.Reason = std::string("invalid payload buffer"), .Success = false};
+ return {.Reason = std::string("invalid value buffer"), .Success = false};
}
}
@@ -851,15 +852,15 @@ namespace detail {
}
else
{
- for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++)
+ for (size_t Idx = 0, Count = Values.size(); Idx < Count; Idx++)
{
Result.Success = false;
for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
{
- Result = Session.PutCachePayload(CacheRecord.Key.Bucket,
- CacheRecord.Key.Hash,
- CacheRecord.PayloadIds[Idx],
- Payloads[Idx]);
+ Result = Session.PutCacheValue(CacheRecord.Key.Bucket,
+ CacheRecord.Key.Hash,
+ CacheRecord.ValueContentIds[Idx],
+ Values[Idx]);
}
m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason);
@@ -869,7 +870,7 @@ namespace detail {
if (!Result.Success)
{
- return {.Reason = "Failed to upload payload",
+ return {.Reason = "Failed to upload value",
.Bytes = TotalBytes,
.ElapsedSeconds = TotalElapsedSeconds,
.Success = false};
@@ -1049,7 +1050,7 @@ public:
virtual void GetCacheRecords(std::span<CacheKey> CacheKeys,
std::span<size_t> KeyIndex,
- const CacheRecordPolicy& Policy,
+ const CacheRecordPolicy& DownstreamPolicy,
OnCacheRecordGetComplete&& OnComplete) override final
{
ZEN_TRACE_CPU("Upstream::GetCacheRecords");
@@ -1060,6 +1061,8 @@ public:
if (m_Options.ReadUpstream)
{
+ CacheRecordPolicy UpstreamPolicy = DownstreamPolicy.ConvertToUpstream();
+
for (auto& Endpoint : m_Endpoints)
{
if (RemainingKeys.empty())
@@ -1078,18 +1081,19 @@ public:
{
metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming);
- Result = Endpoint->GetCacheRecords(CacheKeys, RemainingKeys, Policy, [&](CacheRecordGetCompleteParams&& Params) {
- if (Params.Record)
- {
- OnComplete(std::forward<CacheRecordGetCompleteParams>(Params));
+ Result =
+ Endpoint->GetCacheRecords(CacheKeys, RemainingKeys, UpstreamPolicy, [&](CacheRecordGetCompleteParams&& Params) {
+ if (Params.Record)
+ {
+ OnComplete(std::forward<CacheRecordGetCompleteParams>(Params));
- Stats.CacheHitCount.Increment(1);
- }
- else
- {
- Missing.push_back(Params.KeyIndex);
- }
- });
+ Stats.CacheHitCount.Increment(1);
+ }
+ else
+ {
+ Missing.push_back(Params.KeyIndex);
+ }
+ });
}
Stats.CacheGetCount.Increment(int64_t(RemainingKeys.size()));
@@ -1115,11 +1119,11 @@ public:
}
}
- virtual void GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests,
- std::span<size_t> RequestIndex,
- OnCachePayloadGetComplete&& OnComplete) override final
+ virtual void GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests,
+ std::span<size_t> RequestIndex,
+ OnCacheValueGetComplete&& OnComplete) override final
{
- ZEN_TRACE_CPU("Upstream::GetCachePayloads");
+ ZEN_TRACE_CPU("Upstream::GetCacheValues");
std::shared_lock<std::shared_mutex> _(m_EndpointsMutex);
@@ -1145,10 +1149,10 @@ public:
{
metrics::OperationTiming::Scope Scope(Endpoint->Stats().CacheGetRequestTiming);
- Result = Endpoint->GetCachePayloads(CacheChunkRequests, RemainingKeys, [&](CachePayloadGetCompleteParams&& Params) {
- if (Params.Payload)
+ Result = Endpoint->GetCacheValues(CacheChunkRequests, RemainingKeys, [&](CacheValueGetCompleteParams&& Params) {
+ if (Params.Value)
{
- OnComplete(std::forward<CachePayloadGetCompleteParams>(Params));
+ OnComplete(std::forward<CacheValueGetCompleteParams>(Params));
Stats.CacheHitCount.Increment(1);
}
@@ -1166,7 +1170,7 @@ public:
{
Stats.CacheErrorCount.Increment(1);
- ZEN_ERROR("get cache payloads(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'",
+ ZEN_ERROR("get cache values(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'",
Endpoint->GetEndpointInfo().Url,
Result.Error.Reason,
Result.Error.ErrorCode);
@@ -1178,13 +1182,13 @@ public:
for (size_t Index : RemainingKeys)
{
- OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = IoBuffer()});
+ OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Value = IoBuffer()});
}
}
- virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) override
+ virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) override
{
- ZEN_TRACE_CPU("Upstream::GetCachePayload");
+ ZEN_TRACE_CPU("Upstream::GetCacheValue");
if (m_Options.ReadUpstream)
{
@@ -1200,7 +1204,7 @@ public:
{
metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming);
- Result = Endpoint->GetCachePayload(CacheKey, PayloadId);
+ Result = Endpoint->GetCacheValue(CacheKey, ValueContentId);
}
Stats.CacheGetCount.Increment(1);
@@ -1217,7 +1221,7 @@ public:
{
Stats.CacheErrorCount.Increment(1);
- ZEN_ERROR("get cache payload FAILED, endpoint '{}', reason '{}', error code '{}'",
+ ZEN_ERROR("get cache value FAILED, endpoint '{}', reason '{}', error code '{}'",
Endpoint->GetEndpointInfo().Url,
Result.Error.Reason,
Result.Error.ErrorCode);
@@ -1302,18 +1306,18 @@ private:
return;
}
- for (const IoHash& PayloadId : CacheRecord.PayloadIds)
+ for (const IoHash& ValueContentId : CacheRecord.ValueContentIds)
{
- if (IoBuffer Payload = m_CidStore.FindChunkByCid(PayloadId))
+ if (IoBuffer Payload = m_CidStore.FindChunkByCid(ValueContentId))
{
Payloads.push_back(Payload);
}
else
{
- ZEN_WARN("process upstream FAILED, '{}/{}/{}', payload doesn't exist in CAS",
+ ZEN_WARN("process upstream FAILED, '{}/{}/{}', ValueContentId doesn't exist in CAS",
CacheRecord.Key.Bucket,
CacheRecord.Key.Hash,
- PayloadId);
+ ValueContentId);
return;
}
}