aboutsummaryrefslogtreecommitdiff
path: root/zenserver/upstream/upstreamcache.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-05-19 11:37:25 +0200
committerDan Engelbrecht <[email protected]>2022-05-19 11:37:25 +0200
commita9130d34b5318b0da5d3547c432a8734213fbe9b (patch)
tree2cdb96f85e221cc24227b410d4d5f8f4e4af7a41 /zenserver/upstream/upstreamcache.cpp
parentMerge pull request #98 from EpicGames/de/fix-bucket-name-rules (diff)
downloadzen-a9130d34b5318b0da5d3547c432a8734213fbe9b.tar.xz
zen-a9130d34b5318b0da5d3547c432a8734213fbe9b.zip
Keep Namespace out of CacheKey and store it on request level
RPC requests now has a Namespace field under Params instead of one Namespace per cache key Fall back to legacy upstream HTTP URI format if default namespace is requested
Diffstat (limited to 'zenserver/upstream/upstreamcache.cpp')
-rw-r--r--zenserver/upstream/upstreamcache.cpp103
1 files changed, 67 insertions, 36 deletions
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index 52513abe9..98b4439c7 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -182,7 +182,7 @@ namespace detail {
virtual UpstreamEndpointStatus GetStatus() override { return m_Status.EndpointStatus(); }
- virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override
+ virtual GetUpstreamCacheResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) override
{
ZEN_TRACE_CPU("Upstream::Horde::GetSingleCacheRecord");
@@ -191,11 +191,11 @@ namespace detail {
CloudCacheSession Session(m_Client);
CloudCacheResult Result;
- std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheKey.Namespace);
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace);
if (m_UseLegacyDdc && Type == ZenContentType::kBinary)
{
- std::string_view DdcNamespace = GetActualDdcNamespace(Session, CacheKey.Namespace);
+ std::string_view DdcNamespace = GetActualDdcNamespace(Session, Namespace);
Result = Session.GetDerivedData(DdcNamespace, CacheKey.Bucket, CacheKey.Hash);
}
else if (Type == ZenContentType::kCompressedBinary)
@@ -299,7 +299,9 @@ namespace detail {
}
}
- virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKeyRequest*> Requests, OnCacheRecordGetComplete&& OnComplete) override
+ virtual GetUpstreamCacheResult GetCacheRecords(std::string_view Namespace,
+ std::span<CacheKeyRequest*> Requests,
+ OnCacheRecordGetComplete&& OnComplete) override
{
ZEN_TRACE_CPU("Upstream::Horde::GetCacheRecords");
@@ -314,7 +316,7 @@ namespace detail {
if (!Result.Error)
{
- std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheKey.Namespace);
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace);
CloudCacheResult RefResult =
Session.GetRef(BlobStoreNamespace, CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject);
AppendResult(RefResult, Result);
@@ -351,14 +353,14 @@ namespace detail {
return Result;
}
- virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) override
+ virtual GetUpstreamCacheResult GetCacheValue(std::string_view Namespace, const CacheKey&, const IoHash& ValueContentId) override
{
ZEN_TRACE_CPU("Upstream::Horde::GetSingleCacheValue");
try
{
CloudCacheSession Session(m_Client);
- std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheKey.Namespace);
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace);
const CloudCacheResult Result = Session.GetCompressedBlob(BlobStoreNamespace, ValueContentId);
m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason);
@@ -383,7 +385,8 @@ namespace detail {
}
}
- virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest*> CacheChunkRequests,
+ virtual GetUpstreamCacheResult GetCacheValues(std::string_view Namespace,
+ std::span<CacheChunkRequest*> CacheChunkRequests,
OnCacheValueGetComplete&& OnComplete) override final
{
ZEN_TRACE_CPU("Upstream::Horde::GetCacheValues");
@@ -399,7 +402,7 @@ namespace detail {
CompressedBuffer Compressed;
if (!Result.Error)
{
- std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Request.Key.Namespace);
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace);
const CloudCacheResult BlobResult = Session.GetCompressedBlob(BlobStoreNamespace, Request.ChunkId);
Payload = BlobResult.Response;
@@ -446,7 +449,7 @@ namespace detail {
CloudCacheResult Result;
for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
{
- std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheRecord.Key.Namespace);
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheRecord.Namespace);
if (m_UseLegacyDdc)
{
Result = Session.PutDerivedData(BlobStoreNamespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue);
@@ -484,6 +487,7 @@ namespace detail {
return PerformStructuredPut(
Session,
+ CacheRecord.Namespace,
CacheRecord.Key,
ReferencingObject.Save().GetBuffer().AsIoBuffer(),
MaxAttempts,
@@ -503,6 +507,7 @@ namespace detail {
{
return PerformStructuredPut(
Session,
+ CacheRecord.Namespace,
CacheRecord.Key,
RecordValue,
MaxAttempts,
@@ -548,6 +553,7 @@ namespace detail {
PutUpstreamCacheResult PerformStructuredPut(
CloudCacheSession& Session,
+ std::string_view Namespace,
const CacheKey& Key,
IoBuffer ObjectBuffer,
const int32_t MaxAttempts,
@@ -556,7 +562,7 @@ namespace detail {
int64_t TotalBytes = 0ull;
double TotalElapsedSeconds = 0.0;
- std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Key.Namespace);
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace);
const auto PutBlobs = [&](std::span<IoHash> ValueContentIds, std::string& OutReason) -> bool {
for (const IoHash& ValueContentId : ValueContentIds)
{
@@ -738,14 +744,14 @@ namespace detail {
virtual UpstreamEndpointStatus GetStatus() override { return m_Status.EndpointStatus(); }
- virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override
+ virtual GetUpstreamCacheResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) override
{
ZEN_TRACE_CPU("Upstream::Zen::GetSingleCacheRecord");
try
{
ZenStructuredCacheSession Session(*m_Client);
- const ZenCacheResult Result = Session.GetCacheRecord(CacheKey.Bucket, CacheKey.Hash, Type);
+ const ZenCacheResult Result = Session.GetCacheRecord(Namespace, CacheKey.Bucket, CacheKey.Hash, Type);
m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason);
@@ -769,20 +775,24 @@ namespace detail {
}
}
- virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKeyRequest*> Requests, OnCacheRecordGetComplete&& OnComplete) override
+ virtual GetUpstreamCacheResult GetCacheRecords(std::string_view Namespace,
+ std::span<CacheKeyRequest*> Requests,
+ OnCacheRecordGetComplete&& OnComplete) override
{
ZEN_TRACE_CPU("Upstream::Zen::GetCacheRecords");
ZEN_ASSERT(Requests.size() > 0);
CbObjectWriter BatchRequest;
BatchRequest << "Method"sv
- << "GetCacheRecords";
+ << "GetCacheRecords"sv;
BatchRequest.BeginObject("Params"sv);
{
CachePolicy DefaultPolicy = Requests[0]->Policy.GetRecordPolicy();
BatchRequest << "DefaultPolicy"sv << WriteToString<128>(DefaultPolicy);
+ BatchRequest << "Namespace"sv << Namespace;
+
BatchRequest.BeginArray("Requests"sv);
for (CacheKeyRequest* Request : Requests)
{
@@ -791,7 +801,6 @@ namespace detail {
const CacheKey& Key = Request->Key;
BatchRequest.BeginObject("Key"sv);
{
- BatchRequest << "Namespace"sv << Key.Namespace;
BatchRequest << "Bucket"sv << Key.Bucket;
BatchRequest << "Hash"sv << Key.Hash;
}
@@ -848,14 +857,16 @@ namespace detail {
return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
}
- virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) override
+ virtual GetUpstreamCacheResult GetCacheValue(std::string_view Namespace,
+ const CacheKey& CacheKey,
+ const IoHash& ValueContentId) override
{
ZEN_TRACE_CPU("Upstream::Zen::GetSingleCacheValue");
try
{
ZenStructuredCacheSession Session(*m_Client);
- const ZenCacheResult Result = Session.GetCacheValue(CacheKey.Bucket, CacheKey.Hash, ValueContentId);
+ const ZenCacheResult Result = Session.GetCacheValue(Namespace, CacheKey.Bucket, CacheKey.Hash, ValueContentId);
m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason);
@@ -879,7 +890,8 @@ namespace detail {
}
}
- virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest*> CacheChunkRequests,
+ virtual GetUpstreamCacheResult GetCacheValues(std::string_view Namespace,
+ std::span<CacheChunkRequest*> CacheChunkRequests,
OnCacheValueGetComplete&& OnComplete) override final
{
ZEN_TRACE_CPU("Upstream::Zen::GetCacheValues");
@@ -887,12 +899,16 @@ namespace detail {
CbObjectWriter BatchRequest;
BatchRequest << "Method"sv
- << "GetCacheChunks";
+ << "GetCacheChunks"sv;
+ BatchRequest << "Namespace"sv << Namespace;
BatchRequest.BeginObject("Params"sv);
{
CachePolicy DefaultPolicy = CacheChunkRequests[0]->Policy;
BatchRequest << "DefaultPolicy"sv << WriteToString<128>(DefaultPolicy).ToView();
+
+ BatchRequest << "Namespace"sv << Namespace;
+
BatchRequest.BeginArray("ChunkRequests"sv);
{
for (CacheChunkRequest* RequestPtr : CacheChunkRequests)
@@ -902,7 +918,6 @@ namespace detail {
BatchRequest.BeginObject();
{
BatchRequest.BeginObject("Key"sv);
- BatchRequest << "Namespace"sv << Request.Key.Namespace;
BatchRequest << "Bucket"sv << Request.Key.Bucket;
BatchRequest << "Hash"sv << Request.Key.Hash;
BatchRequest.EndObject();
@@ -1042,7 +1057,11 @@ namespace detail {
for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
{
- Result = Session.PutCacheRecord(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, PackagePayload, CacheRecord.Type);
+ Result = Session.PutCacheRecord(CacheRecord.Namespace,
+ CacheRecord.Key.Bucket,
+ CacheRecord.Key.Hash,
+ PackagePayload,
+ CacheRecord.Type);
}
m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason);
@@ -1061,12 +1080,14 @@ namespace detail {
CbPackage BatchPackage;
CbObjectWriter BatchWriter;
BatchWriter << "Method"sv
- << "PutCacheValues";
+ << "PutCacheValues"sv;
BatchWriter.BeginObject("Params"sv);
{
// DefaultPolicy unspecified and expected to be Default
+ BatchWriter << "Namespace"sv << CacheRecord.Namespace;
+
BatchWriter.BeginArray("Requests"sv);
{
BatchWriter.BeginObject();
@@ -1074,7 +1095,6 @@ namespace detail {
const CacheKey& Key = CacheRecord.Key;
BatchWriter.BeginObject("Key"sv);
{
- BatchWriter << "Namespace"sv << Key.Namespace;
BatchWriter << "Bucket"sv << Key.Bucket;
BatchWriter << "Hash"sv << Key.Hash;
}
@@ -1108,7 +1128,8 @@ namespace detail {
Result.Success = false;
for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
{
- Result = Session.PutCacheValue(CacheRecord.Key.Bucket,
+ Result = Session.PutCacheValue(CacheRecord.Namespace,
+ CacheRecord.Key.Bucket,
CacheRecord.Key.Hash,
CacheRecord.ValueContentIds[Idx],
Values[Idx]);
@@ -1131,7 +1152,11 @@ namespace detail {
Result.Success = false;
for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
{
- Result = Session.PutCacheRecord(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue, CacheRecord.Type);
+ Result = Session.PutCacheRecord(CacheRecord.Namespace,
+ CacheRecord.Key.Bucket,
+ CacheRecord.Key.Hash,
+ RecordValue,
+ CacheRecord.Type);
}
m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason);
@@ -1259,7 +1284,7 @@ public:
}
}
- virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override
+ virtual GetUpstreamCacheResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) override
{
ZEN_TRACE_CPU("Upstream::GetCacheRecord");
@@ -1278,7 +1303,7 @@ public:
GetUpstreamCacheResult Result;
{
metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming);
- Result = Endpoint->GetCacheRecord(CacheKey, Type);
+ Result = Endpoint->GetCacheRecord(Namespace, CacheKey, Type);
}
Stats.CacheGetCount.Increment(1);
@@ -1306,7 +1331,9 @@ public:
return {};
}
- virtual void GetCacheRecords(std::span<CacheKeyRequest*> Requests, OnCacheRecordGetComplete&& OnComplete) override final
+ virtual void GetCacheRecords(std::string_view Namespace,
+ std::span<CacheKeyRequest*> Requests,
+ OnCacheRecordGetComplete&& OnComplete) override final
{
ZEN_TRACE_CPU("Upstream::GetCacheRecords");
@@ -1334,7 +1361,7 @@ public:
{
metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming);
- Result = Endpoint->GetCacheRecords(RemainingKeys, [&](CacheRecordGetCompleteParams&& Params) {
+ Result = Endpoint->GetCacheRecords(Namespace, RemainingKeys, [&](CacheRecordGetCompleteParams&& Params) {
if (Params.Record)
{
OnComplete(std::forward<CacheRecordGetCompleteParams>(Params));
@@ -1371,7 +1398,9 @@ public:
}
}
- virtual void GetCacheValues(std::span<CacheChunkRequest*> CacheChunkRequests, OnCacheValueGetComplete&& OnComplete) override final
+ virtual void GetCacheValues(std::string_view Namespace,
+ std::span<CacheChunkRequest*> CacheChunkRequests,
+ OnCacheValueGetComplete&& OnComplete) override final
{
ZEN_TRACE_CPU("Upstream::GetCacheValues");
@@ -1399,7 +1428,7 @@ public:
{
metrics::OperationTiming::Scope Scope(Endpoint->Stats().CacheGetRequestTiming);
- Result = Endpoint->GetCacheValues(RemainingKeys, [&](CacheValueGetCompleteParams&& Params) {
+ Result = Endpoint->GetCacheValues(Namespace, RemainingKeys, [&](CacheValueGetCompleteParams&& Params) {
if (Params.RawHash != Params.RawHash.Zero)
{
OnComplete(std::forward<CacheValueGetCompleteParams>(Params));
@@ -1436,7 +1465,9 @@ public:
}
}
- virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) override
+ virtual GetUpstreamCacheResult GetCacheValue(std::string_view Namespace,
+ const CacheKey& CacheKey,
+ const IoHash& ValueContentId) override
{
ZEN_TRACE_CPU("Upstream::GetCacheValue");
@@ -1454,7 +1485,7 @@ public:
{
metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming);
- Result = Endpoint->GetCacheValue(CacheKey, ValueContentId);
+ Result = Endpoint->GetCacheValue(Namespace, CacheKey, ValueContentId);
}
Stats.CacheGetCount.Increment(1);
@@ -1550,7 +1581,7 @@ private:
ZenCacheValue CacheValue;
std::vector<IoBuffer> Payloads;
- if (!m_CacheStore.Get(CacheRecord.Key.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheValue))
+ if (!m_CacheStore.Get(CacheRecord.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheValue))
{
ZEN_WARN("process upstream FAILED, '{}/{}', cache record doesn't exist", CacheRecord.Key.Bucket, CacheRecord.Key.Hash);
return;
@@ -1565,7 +1596,7 @@ private:
else
{
ZEN_WARN("process upstream FAILED, '{}/{}/{}/{}', ValueContentId doesn't exist in CAS",
- CacheRecord.Key.Namespace,
+ CacheRecord.Namespace,
CacheRecord.Key.Bucket,
CacheRecord.Key.Hash,
ValueContentId);