aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormattpetersepic <[email protected]>2022-01-27 10:52:07 -0700
committerGitHub <[email protected]>2022-01-27 10:52:07 -0700
commitfb1afc87a436ff6374daeaef5f7682f098a85cd7 (patch)
tree89810fc5d85ab81cce7fdc4e26849ad0d25cf2d1
parentAdd batched CacheRecord put rpc (#38) (diff)
downloadzen-fb1afc87a436ff6374daeaef5f7682f098a85cd7.tar.xz
zen-fb1afc87a436ff6374daeaef5f7682f098a85cd7.zip
Rename Paylod to Value to match the client side. Rename PayloadId to ValueContentId where its a hash instead of an oid.
-rw-r--r--zenserver/cache/structuredcache.cpp99
-rw-r--r--zenserver/cache/structuredcache.h10
-rw-r--r--zenserver/upstream/upstreamcache.cpp111
-rw-r--r--zenserver/upstream/upstreamcache.h24
-rw-r--r--zenserver/upstream/zen.cpp8
-rw-r--r--zenserver/upstream/zen.h4
-rw-r--r--zenutil/include/zenutil/cache/cachekey.h6
7 files changed, 134 insertions, 128 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index cb29b3645..c2cd679b1 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -142,13 +142,13 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request)
CachePolicy PolicyFromURL = ParseCachePolicy(Request.GetQueryParams());
- if (Ref.PayloadId == IoHash::Zero)
+ if (Ref.ValueContentId == IoHash::Zero)
{
return HandleCacheRecordRequest(Request, Ref, PolicyFromURL);
}
else
{
- return HandleCachePayloadRequest(Request, Ref, PolicyFromURL);
+ return HandleCacheValueRequest(Request, Ref, PolicyFromURL);
}
return;
@@ -511,8 +511,9 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord)
{
- m_UpstreamCache.EnqueueUpstream(
- {.Type = ZenContentType::kCbObject, .Key = {Ref.BucketSegment, Ref.HashKey}, .PayloadIds = std::move(ValidAttachments)});
+ m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbObject,
+ .Key = {Ref.BucketSegment, Ref.HashKey},
+ .ValueContentIds = std::move(ValidAttachments)});
}
Request.WriteResponse(HttpResponseCode::Created);
@@ -593,8 +594,9 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord)
{
- m_UpstreamCache.EnqueueUpstream(
- {.Type = ZenContentType::kCbPackage, .Key = {Ref.BucketSegment, Ref.HashKey}, .PayloadIds = std::move(ValidAttachments)});
+ m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage,
+ .Key = {Ref.BucketSegment, Ref.HashKey},
+ .ValueContentIds = std::move(ValidAttachments)});
}
Request.WriteResponse(HttpResponseCode::Created);
@@ -606,16 +608,16 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
}
void
-HttpStructuredCacheService::HandleCachePayloadRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL)
+HttpStructuredCacheService::HandleCacheValueRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL)
{
switch (Request.RequestVerb())
{
case HttpVerb::kHead:
case HttpVerb::kGet:
- HandleGetCachePayload(Request, Ref, PolicyFromURL);
+ HandleGetCacheValue(Request, Ref, PolicyFromURL);
break;
case HttpVerb::kPut:
- HandlePutCachePayload(Request, Ref, PolicyFromURL);
+ HandlePutCacheValue(Request, Ref, PolicyFromURL);
break;
default:
break;
@@ -623,16 +625,17 @@ HttpStructuredCacheService::HandleCachePayloadRequest(HttpServerRequest& Request
}
void
-HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL)
+HttpStructuredCacheService::HandleGetCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL)
{
- IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId);
+ IoBuffer Value = m_CidStore.FindChunkByCid(Ref.ValueContentId);
bool InUpstreamCache = false;
CachePolicy Policy = PolicyFromURL;
- const bool QueryUpstream = !Payload && EnumHasAllFlags(Policy, CachePolicy::QueryRemote);
+ const bool QueryUpstream = !Value && EnumHasAllFlags(Policy, CachePolicy::QueryRemote);
if (QueryUpstream)
{
- if (auto UpstreamResult = m_UpstreamCache.GetCachePayload({Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId); UpstreamResult.Success)
+ if (auto UpstreamResult = m_UpstreamCache.GetCacheValue({Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId);
+ UpstreamResult.Success)
{
if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value)))
{
@@ -646,9 +649,9 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques
}
}
- if (!Payload)
+ if (!Value)
{
- ZEN_DEBUG("MISS - '{}/{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId, ToString(Request.AcceptContentType()));
+ ZEN_DEBUG("MISS - '{}/{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, Ref.ValueContentId, ToString(Request.AcceptContentType()));
m_CacheStats.MissCount++;
return Request.WriteResponse(HttpResponseCode::NotFound);
}
@@ -656,9 +659,9 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques
ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})",
Ref.BucketSegment,
Ref.HashKey,
- Ref.PayloadId,
- NiceBytes(Payload.Size()),
- ToString(Payload.GetContentType()),
+ Ref.ValueContentId,
+ NiceBytes(Value.Size()),
+ ToString(Value.GetContentType()),
InUpstreamCache ? "UPSTREAM" : "LOCAL");
m_CacheStats.HitCount++;
@@ -673,12 +676,12 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques
}
else
{
- Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Payload);
+ Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Value);
}
}
void
-HttpStructuredCacheService::HandlePutCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL)
+HttpStructuredCacheService::HandlePutCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL)
{
// Note: Individual cacherecord values are not propagated upstream until a valid cache record has been stored
ZEN_UNUSED(PolicyFromURL);
@@ -699,9 +702,11 @@ HttpStructuredCacheService::HandlePutCachePayload(zen::HttpServerRequest& Reques
return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Attachments must be compressed"sv);
}
- if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Ref.PayloadId)
+ if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Ref.ValueContentId)
{
- return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "ValueId does not match attachment hash"sv);
+ return Request.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ "ValueContentId does not match attachment hash"sv);
}
CidStore::InsertResult Result = m_CidStore.AddChunk(Compressed);
@@ -709,7 +714,7 @@ HttpStructuredCacheService::HandlePutCachePayload(zen::HttpServerRequest& Reques
ZEN_DEBUG("PUT - '{}/{}/{}' {} '{}' ({})",
Ref.BucketSegment,
Ref.HashKey,
- Ref.PayloadId,
+ Ref.ValueContentId,
NiceBytes(Body.Size()),
ToString(Body.GetContentType()),
Result.New ? "NEW" : "OLD");
@@ -738,13 +743,13 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef&
}
std::string_view HashSegment;
- std::string_view PayloadSegment;
+ std::string_view ValueSegment;
- std::string_view::size_type PayloadSplitOffset = Key.find_last_of('/');
+ std::string_view::size_type ValueSplitOffset = Key.find_last_of('/');
// We know there is a slash so no need to check for npos return
- if (PayloadSplitOffset == BucketSplitOffset)
+ if (ValueSplitOffset == BucketSplitOffset)
{
// Basic cache record lookup
HashSegment = Key.substr(BucketSplitOffset + 1);
@@ -752,8 +757,8 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef&
else
{
// Cache record + valueid lookup
- HashSegment = Key.substr(BucketSplitOffset + 1, PayloadSplitOffset - BucketSplitOffset - 1);
- PayloadSegment = Key.substr(PayloadSplitOffset + 1);
+ HashSegment = Key.substr(BucketSplitOffset + 1, ValueSplitOffset - BucketSplitOffset - 1);
+ ValueSegment = Key.substr(ValueSplitOffset + 1);
}
if (HashSegment.size() != IoHash::StringLength)
@@ -761,9 +766,9 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef&
return false;
}
- if (!PayloadSegment.empty() && PayloadSegment.size() == IoHash::StringLength)
+ if (!ValueSegment.empty() && ValueSegment.size() == IoHash::StringLength)
{
- const bool IsOk = ParseHexBytes(PayloadSegment.data(), PayloadSegment.size(), OutRef.PayloadId.Hash);
+ const bool IsOk = ParseHexBytes(ValueSegment.data(), ValueSegment.size(), OutRef.ValueContentId.Hash);
if (!IsOk)
{
@@ -772,7 +777,7 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef&
}
else
{
- OutRef.PayloadId = IoHash::Zero;
+ OutRef.ValueContentId = IoHash::Zero;
}
const bool IsOk = ParseHexBytes(HashSegment.data(), HashSegment.size(), OutRef.HashKey.Hash);
@@ -826,7 +831,7 @@ HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request)
}
else if (Method == "GetCacheValues"sv)
{
- HandleRpcGetCachePayloads(AsyncRequest, Object);
+ HandleRpcGetCacheValues(AsyncRequest, Object);
}
else
{
@@ -1211,9 +1216,9 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req
}
void
-HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Request, CbObjectView RpcRequest)
+HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Request, CbObjectView RpcRequest)
{
- ZEN_TRACE_CPU("Z$::RpcGetCachePayloads");
+ ZEN_TRACE_CPU("Z$::RpcGetCacheValues");
ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheValues"sv);
@@ -1228,7 +1233,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re
CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
const CacheKey Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash());
const IoHash ChunkId = RequestObject["ChunkId"sv].AsHash();
- const Oid PayloadId = RequestObject["ValueId"sv].AsObjectId();
+ const Oid ValueId = RequestObject["ValueId"sv].AsObjectId();
const uint64_t RawOffset = RequestObject["RawOffset"sv].AsUInt64();
const uint64_t RawSize = RequestObject["RawSize"sv].AsUInt64();
std::string_view PolicyText = RequestObject["Policy"sv].AsString();
@@ -1237,7 +1242,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re
// Note we could use emplace_back here but [Apple] LLVM-12's C++ library
// can't infer a constructor like other platforms (or can't handle an
// initializer list like others do).
- ChunkRequests.push_back({Key, ChunkId, PayloadId, RawOffset, RawSize, ChunkPolicy});
+ ChunkRequests.push_back({Key, ChunkId, ValueId, RawOffset, RawSize, ChunkPolicy});
}
if (ChunkRequests.empty())
@@ -1250,8 +1255,8 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re
// Unreal uses a 12 byte ID to address cache record values. When the uncompressed hash (ChunkId)
// is missing, load the cache record and try to find the raw hash from the ValueId.
{
- const auto GetChunkIdFromPayloadId = [](CbObjectView Record, const Oid& PayloadId) -> IoHash {
- if (PayloadId)
+ const auto GetChunkIdFromValueId = [](CbObjectView Record, const Oid& ValueId) -> IoHash {
+ if (ValueId)
{
// A valid ValueId indicates that the caller is searching for a Value in a Record
// that was Put with ICacheStore::Put
@@ -1260,7 +1265,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re
CbObjectView ValueObject = ValueView.AsObjectView();
const Oid Id = ValueObject["Id"sv].AsObjectId();
- if (Id == PayloadId)
+ if (Id == ValueId)
{
return ValueObject["RawHash"sv].AsHash();
}
@@ -1270,7 +1275,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re
if (CbObjectView ValueObject = Record["Value"sv].AsObjectView())
{
const Oid Id = ValueObject["Id"sv].AsObjectId();
- if (Id == PayloadId)
+ if (Id == ValueId)
{
return ValueObject["RawHash"sv].AsHash();
}
@@ -1281,7 +1286,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re
CbObjectView AttachmentObject = AttachmentView.AsObjectView();
const Oid Id = AttachmentObject["Id"sv].AsObjectId();
- if (Id == PayloadId)
+ if (Id == ValueId)
{
return AttachmentObject["RawHash"sv].AsHash();
}
@@ -1319,7 +1324,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re
if (CurrentRecordBuffer)
{
- ChunkRequest.ChunkId = GetChunkIdFromPayloadId(CbObjectView(CurrentRecordBuffer.GetData()), ChunkRequest.PayloadId);
+ ChunkRequest.ChunkId = GetChunkIdFromValueId(CbObjectView(CurrentRecordBuffer.GetData()), ChunkRequest.ValueId);
}
}
}
@@ -1366,8 +1371,8 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re
if (!UpstreamRequests.empty())
{
- const auto OnCachePayloadGetComplete = [this, &Chunks](CachePayloadGetCompleteParams&& Params) {
- if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Payload)))
+ const auto OnCacheValueGetComplete = [this, &Chunks](CacheValueGetCompleteParams&& Params) {
+ if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value)))
{
m_CidStore.AddChunk(Compressed);
@@ -1375,11 +1380,11 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re
Params.Request.Key.Bucket,
Params.Request.Key.Hash,
Params.Request.ChunkId,
- NiceBytes(Params.Payload.GetSize()),
+ NiceBytes(Params.Value.GetSize()),
"UPSTREAM");
ZEN_ASSERT(Params.RequestIndex < Chunks.size());
- Chunks[Params.RequestIndex] = std::move(Params.Payload);
+ Chunks[Params.RequestIndex] = std::move(Params.Value);
m_CacheStats.HitCount++;
m_CacheStats.UpstreamHitCount++;
@@ -1391,7 +1396,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re
}
};
- m_UpstreamCache.GetCachePayloads(ChunkRequests, UpstreamRequests, std::move(OnCachePayloadGetComplete));
+ m_UpstreamCache.GetCacheValues(ChunkRequests, UpstreamRequests, std::move(OnCacheValueGetComplete));
}
CbPackage RpcResponse;
diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h
index 5b6b10898..88bf6cda1 100644
--- a/zenserver/cache/structuredcache.h
+++ b/zenserver/cache/structuredcache.h
@@ -74,7 +74,7 @@ private:
{
std::string BucketSegment;
IoHash HashKey;
- IoHash PayloadId;
+ IoHash ValueContentId;
};
struct CacheStats
@@ -94,13 +94,13 @@ private:
void HandleCacheRecordRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL);
void HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
void HandlePutCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL);
- void HandleCachePayloadRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL);
- void HandleGetCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL);
- void HandlePutCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL);
+ void HandleCacheValueRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL);
+ void HandleGetCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL);
+ void HandlePutCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL);
void HandleRpcRequest(zen::HttpServerRequest& Request);
void HandleRpcPutCacheRecords(zen::HttpServerRequest& Request, const CbPackage& BatchRequest);
void HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView BatchRequest);
- void HandleRpcGetCachePayloads(zen::HttpServerRequest& Request, CbObjectView BatchRequest);
+ void HandleRpcGetCacheValues(zen::HttpServerRequest& Request, CbObjectView BatchRequest);
void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket);
virtual void HandleStatsRequest(zen::HttpServerRequest& Request) override;
virtual void HandleStatusRequest(zen::HttpServerRequest& Request) override;
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index 091406db3..657cfb729 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -270,14 +270,14 @@ namespace detail {
return Result;
}
- virtual GetUpstreamCacheResult GetCachePayload(const CacheKey&, const IoHash& PayloadId) override
+ virtual GetUpstreamCacheResult GetCacheValue(const CacheKey&, const IoHash& ValueContentId) override
{
- ZEN_TRACE_CPU("Upstream::Horde::GetSingleCachePayload");
+ ZEN_TRACE_CPU("Upstream::Horde::GetSingleCacheValue");
try
{
CloudCacheSession Session(m_Client);
- const CloudCacheResult Result = Session.GetCompressedBlob(PayloadId);
+ const CloudCacheResult Result = Session.GetCompressedBlob(ValueContentId);
m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason);
@@ -301,11 +301,11 @@ namespace detail {
}
}
- virtual GetUpstreamCacheResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests,
- std::span<size_t> RequestIndex,
- OnCachePayloadGetComplete&& OnComplete) override final
+ virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests,
+ std::span<size_t> RequestIndex,
+ OnCacheValueGetComplete&& OnComplete) override final
{
- ZEN_TRACE_CPU("Upstream::Horde::GetCachePayloads");
+ ZEN_TRACE_CPU("Upstream::Horde::GetCacheValues");
CloudCacheSession Session(m_Client);
GetUpstreamCacheResult Result;
@@ -325,7 +325,7 @@ namespace detail {
m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason);
}
- OnComplete({.Request = Request, .RequestIndex = Index, .Payload = Payload});
+ OnComplete({.Request = Request, .RequestIndex = Index, .Value = Payload});
}
return Result;
@@ -333,11 +333,11 @@ namespace detail {
virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord,
IoBuffer RecordValue,
- std::span<IoBuffer const> Payloads) override
+ std::span<IoBuffer const> Values) override
{
ZEN_TRACE_CPU("Upstream::Horde::PutCacheRecord");
- ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size());
+ ZEN_ASSERT(CacheRecord.ValueContentIds.size() == Values.size());
const int32_t MaxAttempts = 3;
try
@@ -371,30 +371,31 @@ namespace detail {
int64_t TotalBytes = 0ull;
double TotalElapsedSeconds = 0.0;
- const auto PutBlobs = [&](std::span<IoHash> PayloadIds, std::string& OutReason) -> bool {
- for (const IoHash& PayloadId : PayloadIds)
+ const auto PutBlobs = [&](std::span<IoHash> ValueContentIds, std::string& OutReason) -> bool {
+ for (const IoHash& ValueContentId : ValueContentIds)
{
- const auto It = std::find(std::begin(CacheRecord.PayloadIds), std::end(CacheRecord.PayloadIds), PayloadId);
+ const auto It =
+ std::find(std::begin(CacheRecord.ValueContentIds), std::end(CacheRecord.ValueContentIds), ValueContentId);
- if (It == std::end(CacheRecord.PayloadIds))
+ if (It == std::end(CacheRecord.ValueContentIds))
{
- OutReason = fmt::format("value '{}' MISSING from local cache", PayloadId);
+ OutReason = fmt::format("value '{}' MISSING from local cache", ValueContentId);
return false;
}
- const size_t Idx = std::distance(std::begin(CacheRecord.PayloadIds), It);
+ const size_t Idx = std::distance(std::begin(CacheRecord.ValueContentIds), It);
CloudCacheResult BlobResult;
for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++)
{
- BlobResult = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]);
+ BlobResult = Session.PutCompressedBlob(CacheRecord.ValueContentIds[Idx], Values[Idx]);
}
m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason);
if (!BlobResult.Success)
{
- OutReason = fmt::format("upload value '{}' FAILED, reason '{}'", PayloadId, BlobResult.Reason);
+ OutReason = fmt::format("upload value '{}' FAILED, reason '{}'", ValueContentId, BlobResult.Reason);
return false;
}
@@ -685,14 +686,14 @@ namespace detail {
return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
}
- virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) override
+ virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) override
{
- ZEN_TRACE_CPU("Upstream::Zen::GetSingleCachePayload");
+ ZEN_TRACE_CPU("Upstream::Zen::GetSingleCacheValue");
try
{
ZenStructuredCacheSession Session(*m_Client);
- const ZenCacheResult Result = Session.GetCachePayload(CacheKey.Bucket, CacheKey.Hash, PayloadId);
+ const ZenCacheResult Result = Session.GetCacheValue(CacheKey.Bucket, CacheKey.Hash, ValueContentId);
m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason);
@@ -716,11 +717,11 @@ namespace detail {
}
}
- virtual GetUpstreamCacheResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests,
- std::span<size_t> RequestIndex,
- OnCachePayloadGetComplete&& OnComplete) override final
+ virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests,
+ std::span<size_t> RequestIndex,
+ OnCacheValueGetComplete&& OnComplete) override final
{
- ZEN_TRACE_CPU("Upstream::Zen::GetCachePayloads");
+ ZEN_TRACE_CPU("Upstream::Zen::GetCacheValues");
std::vector<size_t> IndexMap;
IndexMap.reserve(RequestIndex.size());
@@ -744,7 +745,7 @@ namespace detail {
BatchRequest << "Bucket"sv << Request.Key.Bucket;
BatchRequest << "Hash"sv << Request.Key.Hash;
BatchRequest.EndObject();
- BatchRequest.AddObjectId("ValueId"sv, Request.PayloadId);
+ BatchRequest.AddObjectId("ValueId"sv, Request.ValueId);
BatchRequest << "ChunkId"sv << Request.ChunkId;
BatchRequest << "RawOffset"sv << Request.RawOffset;
BatchRequest << "RawSize"sv << Request.RawSize;
@@ -784,7 +785,7 @@ namespace detail {
}
}
- OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = std::move(Payload)});
+ OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Value = std::move(Payload)});
}
return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true};
@@ -793,7 +794,7 @@ namespace detail {
for (size_t Index : RequestIndex)
{
- OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = IoBuffer()});
+ OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Value = IoBuffer()});
}
return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
@@ -801,11 +802,11 @@ namespace detail {
virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord,
IoBuffer RecordValue,
- std::span<IoBuffer const> Payloads) override
+ std::span<IoBuffer const> Values) override
{
ZEN_TRACE_CPU("Upstream::Zen::PutCacheRecord");
- ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size());
+ ZEN_ASSERT(CacheRecord.ValueContentIds.size() == Values.size());
const int32_t MaxAttempts = 3;
try
@@ -820,9 +821,9 @@ namespace detail {
CbPackage Package;
Package.SetObject(CbObject(SharedBuffer(RecordValue)));
- for (const IoBuffer& Payload : Payloads)
+ for (const IoBuffer& Value : Values)
{
- if (CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(Payload)))
+ if (CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(Value)))
{
Package.AddAttachment(CbAttachment(AttachmentBuffer));
}
@@ -848,15 +849,15 @@ namespace detail {
}
else
{
- for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++)
+ for (size_t Idx = 0, Count = Values.size(); Idx < Count; Idx++)
{
Result.Success = false;
for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
{
- Result = Session.PutCachePayload(CacheRecord.Key.Bucket,
- CacheRecord.Key.Hash,
- CacheRecord.PayloadIds[Idx],
- Payloads[Idx]);
+ Result = Session.PutCacheValue(CacheRecord.Key.Bucket,
+ CacheRecord.Key.Hash,
+ CacheRecord.ValueContentIds[Idx],
+ Values[Idx]);
}
m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason);
@@ -866,7 +867,7 @@ namespace detail {
if (!Result.Success)
{
- return {.Reason = "Failed to upload payload",
+ return {.Reason = "Failed to upload value",
.Bytes = TotalBytes,
.ElapsedSeconds = TotalElapsedSeconds,
.Success = false};
@@ -1115,11 +1116,11 @@ public:
}
}
- virtual void GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests,
- std::span<size_t> RequestIndex,
- OnCachePayloadGetComplete&& OnComplete) override final
+ virtual void GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests,
+ std::span<size_t> RequestIndex,
+ OnCacheValueGetComplete&& OnComplete) override final
{
- ZEN_TRACE_CPU("Upstream::GetCachePayloads");
+ ZEN_TRACE_CPU("Upstream::GetCacheValues");
std::shared_lock<std::shared_mutex> _(m_EndpointsMutex);
@@ -1145,10 +1146,10 @@ public:
{
metrics::OperationTiming::Scope Scope(Endpoint->Stats().CacheGetRequestTiming);
- Result = Endpoint->GetCachePayloads(CacheChunkRequests, RemainingKeys, [&](CachePayloadGetCompleteParams&& Params) {
- if (Params.Payload)
+ Result = Endpoint->GetCacheValues(CacheChunkRequests, RemainingKeys, [&](CacheValueGetCompleteParams&& Params) {
+ if (Params.Value)
{
- OnComplete(std::forward<CachePayloadGetCompleteParams>(Params));
+ OnComplete(std::forward<CacheValueGetCompleteParams>(Params));
Stats.CacheHitCount.Increment(1);
}
@@ -1166,7 +1167,7 @@ public:
{
Stats.CacheErrorCount.Increment(1);
- ZEN_ERROR("get cache payloads(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'",
+ ZEN_ERROR("get cache values(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'",
Endpoint->GetEndpointInfo().Url,
Result.Error.Reason,
Result.Error.ErrorCode);
@@ -1178,13 +1179,13 @@ public:
for (size_t Index : RemainingKeys)
{
- OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = IoBuffer()});
+ OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Value = IoBuffer()});
}
}
- virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) override
+ virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) override
{
- ZEN_TRACE_CPU("Upstream::GetCachePayload");
+ ZEN_TRACE_CPU("Upstream::GetCacheValue");
if (m_Options.ReadUpstream)
{
@@ -1200,7 +1201,7 @@ public:
{
metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming);
- Result = Endpoint->GetCachePayload(CacheKey, PayloadId);
+ Result = Endpoint->GetCacheValue(CacheKey, ValueContentId);
}
Stats.CacheGetCount.Increment(1);
@@ -1217,7 +1218,7 @@ public:
{
Stats.CacheErrorCount.Increment(1);
- ZEN_ERROR("get cache payload FAILED, endpoint '{}', reason '{}', error code '{}'",
+ ZEN_ERROR("get cache value FAILED, endpoint '{}', reason '{}', error code '{}'",
Endpoint->GetEndpointInfo().Url,
Result.Error.Reason,
Result.Error.ErrorCode);
@@ -1302,18 +1303,18 @@ private:
return;
}
- for (const IoHash& PayloadId : CacheRecord.PayloadIds)
+ for (const IoHash& ValueContentId : CacheRecord.ValueContentIds)
{
- if (IoBuffer Payload = m_CidStore.FindChunkByCid(PayloadId))
+ if (IoBuffer Payload = m_CidStore.FindChunkByCid(ValueContentId))
{
Payloads.push_back(Payload);
}
else
{
- ZEN_WARN("process upstream FAILED, '{}/{}/{}', payload doesn't exist in CAS",
+ ZEN_WARN("process upstream FAILED, '{}/{}/{}', ValueContentId doesn't exist in CAS",
CacheRecord.Key.Bucket,
CacheRecord.Key.Hash,
- PayloadId);
+ ValueContentId);
return;
}
}
diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h
index 16d8c7929..2087b1fba 100644
--- a/zenserver/upstream/upstreamcache.h
+++ b/zenserver/upstream/upstreamcache.h
@@ -27,7 +27,7 @@ struct UpstreamCacheRecord
{
ZenContentType Type = ZenContentType::kBinary;
CacheKey Key;
- std::vector<IoHash> PayloadIds;
+ std::vector<IoHash> ValueContentIds;
};
struct UpstreamCacheOptions
@@ -73,14 +73,14 @@ struct CacheRecordGetCompleteParams
using OnCacheRecordGetComplete = std::function<void(CacheRecordGetCompleteParams&&)>;
-struct CachePayloadGetCompleteParams
+struct CacheValueGetCompleteParams
{
const CacheChunkRequest& Request;
size_t RequestIndex{~size_t(0)};
- IoBuffer Payload;
+ IoBuffer Value;
};
-using OnCachePayloadGetComplete = std::function<void(CachePayloadGetCompleteParams&&)>;
+using OnCacheValueGetComplete = std::function<void(CacheValueGetCompleteParams&&)>;
struct UpstreamEndpointStats
{
@@ -156,11 +156,11 @@ public:
const CacheRecordPolicy& Policy,
OnCacheRecordGetComplete&& OnComplete) = 0;
- virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) = 0;
+ virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& PayloadId) = 0;
- virtual GetUpstreamCacheResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests,
- std::span<size_t> RequestIndex,
- OnCachePayloadGetComplete&& OnComplete) = 0;
+ virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests,
+ std::span<size_t> RequestIndex,
+ OnCacheValueGetComplete&& OnComplete) = 0;
virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord,
IoBuffer RecordValue,
@@ -190,11 +190,11 @@ public:
const CacheRecordPolicy& RecordPolicy,
OnCacheRecordGetComplete&& OnComplete) = 0;
- virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) = 0;
+ virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) = 0;
- virtual void GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests,
- std::span<size_t> RequestIndex,
- OnCachePayloadGetComplete&& OnComplete) = 0;
+ virtual void GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests,
+ std::span<size_t> RequestIndex,
+ OnCacheValueGetComplete&& OnComplete) = 0;
virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0;
diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp
index cd7f48334..a2666ac02 100644
--- a/zenserver/upstream/zen.cpp
+++ b/zenserver/upstream/zen.cpp
@@ -433,10 +433,10 @@ ZenStructuredCacheSession::GetCacheRecord(std::string_view BucketId, const IoHas
}
ZenCacheResult
-ZenStructuredCacheSession::GetCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId)
+ZenStructuredCacheSession::GetCacheValue(std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId)
{
ExtendableStringBuilder<256> Uri;
- Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << PayloadId.ToHexString();
+ Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString();
cpr::Session& Session = m_SessionState->GetSession();
@@ -486,10 +486,10 @@ ZenStructuredCacheSession::PutCacheRecord(std::string_view BucketId, const IoHas
}
ZenCacheResult
-ZenStructuredCacheSession::PutCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId, IoBuffer Payload)
+ZenStructuredCacheSession::PutCacheValue(std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId, IoBuffer Payload)
{
ExtendableStringBuilder<256> Uri;
- Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << PayloadId.ToHexString();
+ Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString();
cpr::Session& Session = m_SessionState->GetSession();
diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h
index c2be2165a..8cc4c121d 100644
--- a/zenserver/upstream/zen.h
+++ b/zenserver/upstream/zen.h
@@ -123,9 +123,9 @@ public:
ZenCacheResult CheckHealth();
ZenCacheResult GetCacheRecord(std::string_view BucketId, const IoHash& Key, ZenContentType Type);
- ZenCacheResult GetCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId);
+ ZenCacheResult GetCacheValue(std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId);
ZenCacheResult PutCacheRecord(std::string_view BucketId, const IoHash& Key, IoBuffer Value, ZenContentType Type);
- ZenCacheResult PutCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId, IoBuffer Payload);
+ ZenCacheResult PutCacheValue(std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId, IoBuffer Payload);
ZenCacheResult InvokeRpc(const CbObjectView& Request);
private:
diff --git a/zenutil/include/zenutil/cache/cachekey.h b/zenutil/include/zenutil/cache/cachekey.h
index fb36c7759..a0a83a883 100644
--- a/zenutil/include/zenutil/cache/cachekey.h
+++ b/zenutil/include/zenutil/cache/cachekey.h
@@ -44,7 +44,7 @@ struct CacheChunkRequest
{
CacheKey Key;
IoHash ChunkId;
- Oid PayloadId;
+ Oid ValueId;
uint64_t RawOffset = 0ull;
uint64_t RawSize = ~uint64_t(0);
CachePolicy Policy = CachePolicy::Default;
@@ -69,11 +69,11 @@ operator<(const CacheChunkRequest& A, const CacheChunkRequest& B)
{
return false;
}
- if (A.PayloadId < B.PayloadId)
+ if (A.ValueId < B.ValueId)
{
return true;
}
- if (B.PayloadId < A.PayloadId)
+ if (B.ValueId < A.ValueId)
{
return false;
}