aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPer Larsson <[email protected]>2021-11-11 11:19:17 +0100
committerPer Larsson <[email protected]>2021-11-11 11:19:17 +0100
commit2c0e2ab5de21b13dcd25758ca3b96af889db7137 (patch)
tree7ea6325cbbd7b30b996635522975c37b144664f5
parentHonor cache policy. (diff)
downloadzen-2c0e2ab5de21b13dcd25758ca3b96af889db7137.tar.xz
zen-2c0e2ab5de21b13dcd25758ca3b96af889db7137.zip
Added batch API to upstream endpoints.
-rw-r--r--zenserver/cache/cachekey.cpp21
-rw-r--r--zenserver/cache/cachekey.h4
-rw-r--r--zenserver/cache/structuredcache.cpp93
-rw-r--r--zenserver/upstream/upstreamcache.cpp346
-rw-r--r--zenserver/upstream/upstreamcache.h65
-rw-r--r--zenserver/upstream/zen.cpp29
-rw-r--r--zenserver/upstream/zen.h3
7 files changed, 413 insertions, 148 deletions
diff --git a/zenserver/cache/cachekey.cpp b/zenserver/cache/cachekey.cpp
index eca2d95d5..2ead9ac58 100644
--- a/zenserver/cache/cachekey.cpp
+++ b/zenserver/cache/cachekey.cpp
@@ -3,6 +3,7 @@
#include "cachekey.h"
#include <zencore/compactbinary.h>
+#include <zencore/compactbinarybuilder.h>
#include <zencore/string.h>
namespace zen {
@@ -113,7 +114,7 @@ CacheRecordPolicy::GetPayloadPolicy(const Oid& PayloadId) const
}
bool
-CacheRecordPolicy::FromCompactBinary(CbObjectView RecordPolicyObject, CacheRecordPolicy& OutRecordPolicy)
+CacheRecordPolicy::Load(CbObjectView RecordPolicyObject, CacheRecordPolicy& OutRecordPolicy)
{
using namespace std::literals;
@@ -139,6 +140,24 @@ CacheRecordPolicy::FromCompactBinary(CbObjectView RecordPolicyObject, CacheRecor
return true;
}
+void
+CacheRecordPolicy::Save(const CacheRecordPolicy& Policy, CbWriter& Writer)
+{
+ Writer << "RecordPolicy"sv << static_cast<uint32_t>(Policy.GetRecordPolicy());
+ Writer << "DefaultPayloadPolicy"sv << static_cast<uint32_t>(Policy.GetDefaultPayloadPolicy());
+
+ if (!Policy.m_PayloadPolicies.empty())
+ {
+ Writer.BeginArray("PayloadPolicies"sv);
+ for (const auto& Kv : Policy.m_PayloadPolicies)
+ {
+ Writer.AddObjectId("Id"sv, Kv.first);
+ Writer << "Policy"sv << static_cast<uint32_t>(Kv.second);
+ }
+ Writer.EndArray();
+ }
+}
+
const CacheKey CacheKey::Empty = CacheKey{.Bucket = std::string(), .Hash = IoHash()};
} // namespace zen
diff --git a/zenserver/cache/cachekey.h b/zenserver/cache/cachekey.h
index 6ce5d3aab..c32f7ed87 100644
--- a/zenserver/cache/cachekey.h
+++ b/zenserver/cache/cachekey.h
@@ -12,6 +12,7 @@
namespace zen {
class CbObjectView;
+class CbWriter;
enum class CachePolicy : uint8_t
{
@@ -50,7 +51,8 @@ public:
CachePolicy GetPayloadPolicy(const Oid& PayloadId) const;
CachePolicy GetDefaultPayloadPolicy() const { return m_DefaultPayloadPolicy; }
- static bool FromCompactBinary(CbObjectView RecordPolicyObject, CacheRecordPolicy& OutRecordPolicy);
+ static bool Load(CbObjectView RecordPolicyObject, CacheRecordPolicy& OutRecordPolicy);
+ static void Save(const CacheRecordPolicy& Policy, CbWriter& Writer);
private:
using PayloadPolicyMap = std::unordered_map<Oid, CachePolicy, Oid::Hasher>;
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index de2fcb27c..721942cc8 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -618,7 +618,7 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques
if (QueryUpstream)
{
- if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({{Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId});
+ if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId);
UpstreamResult.Success)
{
if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value)))
@@ -830,7 +830,7 @@ HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& R
CbObjectView Params = BatchRequest["Params"sv].AsObjectView();
CacheRecordPolicy Policy;
- CacheRecordPolicy::FromCompactBinary(Params["Policy"sv].AsObjectView(), Policy);
+ CacheRecordPolicy::Load(Params["Policy"sv].AsObjectView(), Policy);
const bool SkipAttachments = (Policy.GetRecordPolicy() & CachePolicy::SkipAttachments) == CachePolicy::SkipAttachments;
const bool QueryRemote = m_UpstreamCache && ((Policy.GetRecordPolicy() & CachePolicy::QueryRemote) == CachePolicy::QueryRemote);
@@ -899,18 +899,12 @@ HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& R
if (!UpstreamRequests.empty() && m_UpstreamCache)
{
- auto UpstreamResult = m_UpstreamCache->GetCacheRecords(
- CacheKeys,
- UpstreamRequests,
- [this, &CacheKeys, &CacheValues, &Payloads, SkipAttachments](size_t KeyIndex, IoBuffer UpstreamValue) {
- const CacheKey& Key = CacheKeys[KeyIndex];
- CbPackage UpstreamPackage;
- if (UpstreamValue && UpstreamPackage.TryLoad(UpstreamValue))
+ const auto OnCacheRecordGetComplete =
+ [this, &CacheKeys, &CacheValues, &Payloads, &Missing, SkipAttachments](CacheRecordGetCompleteParams&& Params) {
+ if (Params.Record)
{
- CbObjectView CacheRecord = UpstreamPackage.GetObject();
-
- CacheRecord.IterateAttachments([&](CbFieldView AttachmentHash) {
- if (const CbAttachment* Attachment = UpstreamPackage.FindAttachment(AttachmentHash.AsHash()))
+ Params.Record.IterateAttachments([&](CbFieldView AttachmentHash) {
+ if (const CbAttachment* Attachment = Params.Package.FindAttachment(AttachmentHash.AsHash()))
{
if (CompressedBuffer Chunk = Attachment->AsCompressedBinary())
{
@@ -925,20 +919,21 @@ HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& R
});
ZEN_DEBUG("HIT - '{}/{}' {} '{}' (UPSTREAM)",
- Key.Bucket,
- Key.Hash,
- NiceBytes(UpstreamValue.Size()),
- ToString(UpstreamValue.GetContentType()));
+ Params.CacheKey.Bucket,
+ Params.CacheKey.Hash,
+ NiceBytes(Params.Record.GetView().GetSize()),
+ ToString(HttpContentType::kCbObject));
- CacheValues[KeyIndex] = IoBufferBuilder::MakeCloneFromMemory(CacheRecord.GetView());
+ CacheValues[Params.KeyIndex] = IoBufferBuilder::MakeCloneFromMemory(Params.Record.GetView());
m_CacheStats.UpstreamHitCount++;
}
- });
+ else
+ {
+ Missing.push_back(Params.KeyIndex);
+ }
+ };
- for (size_t MissingUpstream : UpstreamResult.Missing)
- {
- Missing.push_back(MissingUpstream);
- }
+ m_UpstreamCache->GetCacheRecords(CacheKeys, UpstreamRequests, Policy, std::move(OnCacheRecordGetComplete));
}
for (size_t KeyIndex : Missing)
@@ -1104,40 +1099,32 @@ HttpStructuredCacheService::HandleBatchGetCachePayloads(zen::HttpServerRequest&
if (!UpstreamRequests.empty() && m_UpstreamCache)
{
- auto UpstreamResult = m_UpstreamCache->GetCachePayloads(
- ChunkRequests,
- UpstreamRequests,
- [this, &ChunkRequests, &Chunks](size_t ChunkIndex, IoBuffer UpstreamValue) {
- const CacheChunkRequest& ChunkRequest = ChunkRequests[ChunkIndex];
-
- if (CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamValue)))
- {
- auto InsertResult = m_CidStore.AddChunk(CompressedChunk);
- IoBuffer Chunk = CompressedChunk.GetCompressed().Flatten().AsIoBuffer();
+ const auto OnCachePayloadGetComplete = [this, &ChunkRequests, &Chunks, &Missing](CachePayloadGetCompleteParams&& Params) {
+ if (Params.Payload)
+ {
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Payload));
+ auto InsertResult = m_CidStore.AddChunk(Compressed);
- ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})",
- ChunkRequest.Key.Bucket,
- ChunkRequest.Key.Hash,
- ChunkRequest.ChunkId,
- NiceBytes(Chunk.GetSize()),
- ToString(Chunk.GetContentType()),
- "UPSTREAM");
+ ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})",
+ Params.Request.Key.Bucket,
+ Params.Request.Key.Hash,
+ Params.Request.ChunkId,
+ NiceBytes(Params.Payload.GetSize()),
+ ToString(Params.Payload.GetContentType()),
+ "UPSTREAM");
- Chunks[ChunkIndex] = Chunk;
+ Chunks[Params.RequestIndex] = std::move(Params.Payload);
- m_CacheStats.HitCount++;
- m_CacheStats.UpstreamHitCount++;
- }
- else
- {
- ZEN_WARN("got uncompressed upstream cache payload");
- }
- });
+ m_CacheStats.HitCount++;
+ m_CacheStats.UpstreamHitCount++;
+ }
+ else
+ {
+ Missing.push_back(Params.RequestIndex);
+ }
+ };
- for (size_t MissingUpstream : UpstreamResult.Missing)
- {
- Missing.push_back(MissingUpstream);
- }
+ m_UpstreamCache->GetCachePayloads(ChunkRequests, UpstreamRequests, std::move(OnCachePayloadGetComplete));
}
for (size_t RequestIndex : Missing)
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index 7ef0caf62..0a0706656 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -70,7 +70,7 @@ namespace detail {
virtual std::string_view DisplayName() const override { return m_DisplayName; }
- virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override
+ virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override
{
try
{
@@ -144,12 +144,54 @@ namespace detail {
}
}
- virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override
+ virtual GetUpstreamCacheBatchResult GetCacheRecords(std::span<CacheKey> CacheKeys,
+ std::span<size_t> KeyIndex,
+ const CacheRecordPolicy& Policy,
+ OnCacheRecordGetComplete&& OnComplete) override
+ {
+ ZEN_UNUSED(Policy);
+
+ CloudCacheSession Session(m_Client);
+
+ for (size_t Index : KeyIndex)
+ {
+ const CacheKey& CacheKey = CacheKeys[Index];
+ CloudCacheResult Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject);
+
+ CbPackage Package;
+ CbObjectView Record;
+
+ if (Result.Success)
+ {
+ const CbValidateError ValidationResult = ValidateCompactBinary(Result.Response, CbValidateMode::All);
+ if (ValidationResult == CbValidateError::None)
+ {
+ Record = CbObjectView(Result.Response.GetData());
+ Record.IterateAttachments([&Session, &Result, &Package](CbFieldView AttachmentHash) {
+ CloudCacheResult AttachmentResult = Session.GetCompressedBlob(AttachmentHash.AsHash());
+ if (AttachmentResult.Success)
+ {
+ if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response)))
+ {
+ Package.AddAttachment(CbAttachment(Chunk));
+ }
+ }
+ });
+ }
+ }
+
+ OnComplete({.CacheKey = CacheKey, .KeyIndex = Index, .Record = Record, .Package = Package});
+ }
+
+ return {};
+ }
+
+ virtual GetUpstreamCacheResult GetCachePayload(const CacheKey&, const IoHash& PayloadId) override
{
try
{
CloudCacheSession Session(m_Client);
- const CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId);
+ const CloudCacheResult Result = Session.GetCompressedBlob(PayloadId);
if (Result.ErrorCode == 0)
{
@@ -171,6 +213,29 @@ namespace detail {
}
}
+ virtual GetUpstreamCacheBatchResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests,
+ std::span<size_t> RequestIndex,
+ OnCachePayloadGetComplete&& OnComplete) override final
+ {
+ CloudCacheSession Session(m_Client);
+
+ for (size_t Index : RequestIndex)
+ {
+ const CacheChunkRequest& Request = CacheChunkRequests[Index];
+ const CloudCacheResult Result = Session.GetCompressedBlob(Request.ChunkId);
+
+ OnComplete({.Request = Request, .RequestIndex = Index, .Payload = Result.Response});
+
+ if (Result.ErrorCode)
+ {
+ m_HealthOk = false;
+ break;
+ }
+ }
+
+ return {};
+ }
+
virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord,
IoBuffer RecordValue,
std::span<IoBuffer const> Payloads) override
@@ -419,7 +484,7 @@ namespace detail {
virtual std::string_view DisplayName() const override { return m_DisplayName; }
- virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override
+ virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override
{
try
{
@@ -446,13 +511,81 @@ namespace detail {
}
}
- virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override
+ virtual GetUpstreamCacheBatchResult GetCacheRecords(std::span<CacheKey> CacheKeys,
+ std::span<size_t> KeyIndex,
+ const CacheRecordPolicy& Policy,
+ OnCacheRecordGetComplete&& OnComplete) override
+ {
+ std::vector<size_t> IndexMap;
+ IndexMap.reserve(KeyIndex.size());
+
+ CbObjectWriter BatchRequest;
+ BatchRequest << "Method"sv
+ << "GetCacheRecords";
+
+ BatchRequest.BeginObject("Params"sv);
+ {
+ BatchRequest.BeginArray("CacheKeys"sv);
+ for (size_t Index : KeyIndex)
+ {
+ const CacheKey& Key = CacheKeys[Index];
+ IndexMap.push_back(Index);
+
+ BatchRequest.BeginObject();
+ BatchRequest << "Bucket"sv << Key.Bucket;
+ BatchRequest << "Hash"sv << Key.Hash;
+ BatchRequest.EndObject();
+ }
+ BatchRequest.EndArray();
+
+ BatchRequest.BeginObject("Policy"sv);
+ CacheRecordPolicy::Save(Policy, BatchRequest);
+ BatchRequest.EndObject();
+ }
+ BatchRequest.EndObject();
+
+ CbPackage BatchResponse;
+ bool Success = false;
+
+ {
+ ZenStructuredCacheSession Session(*m_Client);
+ ZenCacheResult Result = Session.InvokeRpc(BatchRequest.Save());
+ if (Result.Success)
+ {
+ Success = BatchResponse.TryLoad(Result.Response);
+ }
+ else if (Result.ErrorCode)
+ {
+ Success = m_HealthOk = false;
+ }
+ }
+
+ if (!Success)
+ {
+ for (size_t Index : KeyIndex)
+ {
+ OnComplete({.CacheKey = CacheKeys[Index], .KeyIndex = Index, .Record = CbObjectView(), .Package = CbPackage()});
+ }
+
+ return {};
+ }
+
+ for (size_t LocalIndex = 0; CbFieldView Record : BatchResponse.GetObject()["Result"sv])
+ {
+ const size_t Index = IndexMap[LocalIndex++];
+ OnComplete({.CacheKey = CacheKeys[Index], .KeyIndex = Index, .Record = Record.AsObjectView(), .Package = BatchResponse});
+ }
+
+ return {};
+ }
+
+ virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) override
{
try
{
ZenStructuredCacheSession Session(*m_Client);
const ZenCacheResult Result =
- Session.GetCachePayload(PayloadKey.CacheKey.Bucket, PayloadKey.CacheKey.Hash, PayloadKey.PayloadId);
+ Session.GetCachePayload(CacheKey.Bucket, CacheKey.Hash, PayloadId);
if (Result.ErrorCode == 0)
{
@@ -474,6 +607,91 @@ namespace detail {
}
}
+ virtual GetUpstreamCacheBatchResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests,
+ std::span<size_t> RequestIndex,
+ OnCachePayloadGetComplete&& OnComplete) override final
+ {
+ std::vector<size_t> IndexMap;
+ IndexMap.reserve(RequestIndex.size());
+
+ CbObjectWriter BatchRequest;
+ BatchRequest << "Method"sv
+ << "GetCachePayloads";
+
+ BatchRequest.BeginObject("Params"sv);
+ {
+ BatchRequest.BeginArray("ChunkRequests"sv);
+ {
+ for (size_t Index : RequestIndex)
+ {
+ const CacheChunkRequest& Request = CacheChunkRequests[Index];
+ IndexMap.push_back(Index);
+
+ BatchRequest.BeginObject();
+ {
+ BatchRequest.BeginObject("Key"sv);
+ BatchRequest << "Bucket"sv << Request.Key.Bucket;
+ BatchRequest << "Hash"sv << Request.Key.Hash;
+ BatchRequest.EndObject();
+
+ BatchRequest.AddObjectId("PayloadId"sv, Request.PayloadId);
+ BatchRequest << "ChunkId"sv << Request.ChunkId;
+ BatchRequest << "RawOffset"sv << Request.RawOffset;
+ BatchRequest << "RawSize"sv << Request.RawSize;
+ BatchRequest << "Policy"sv << static_cast<uint32_t>(Request.Policy);
+ }
+ BatchRequest.EndObject();
+ }
+ }
+ BatchRequest.EndArray();
+ }
+ BatchRequest.EndObject();
+
+ CbPackage BatchResponse;
+ bool Success = false;
+
+ {
+ ZenStructuredCacheSession Session(*m_Client);
+ ZenCacheResult Result = Session.InvokeRpc(BatchRequest.Save());
+ if (Result.Success)
+ {
+ Success = BatchResponse.TryLoad(Result.Response);
+ }
+ else if (Result.ErrorCode)
+ {
+ m_HealthOk = false;
+ }
+ }
+
+ if (!Success)
+ {
+ for (size_t Index : RequestIndex)
+ {
+ OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = IoBuffer()});
+ }
+
+ return {};
+ }
+
+ for (int32_t LocalIndex = 0; CbFieldView AttachmentHash : BatchResponse.GetObject()["Result"sv])
+ {
+ const size_t Index = IndexMap[LocalIndex++];
+ IoBuffer Payload;
+
+ if (const CbAttachment* Attachment = BatchResponse.FindAttachment(AttachmentHash.AsHash()))
+ {
+ if (const CompressedBuffer& Compressed = Attachment->AsCompressedBinary())
+ {
+ Payload = Compressed.GetCompressed().Flatten().AsIoBuffer();
+ }
+ }
+
+ OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = std::move(Payload)});
+ }
+
+ return {};
+ }
+
virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord,
IoBuffer RecordValue,
std::span<IoBuffer const> Payloads) override
@@ -758,7 +976,7 @@ public:
virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) override { m_Endpoints.emplace_back(std::move(Endpoint)); }
- virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override
+ virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override
{
if (m_Options.ReadUpstream)
{
@@ -780,94 +998,82 @@ public:
return {};
}
- virtual GetUpstreamCacheBatchResult GetCacheRecords(std::span<CacheKey> CacheKeys,
- std::span<size_t> KeyIndex,
- OnCacheGetComplete OnComplete) override final
+ virtual void GetCacheRecords(std::span<CacheKey> CacheKeys,
+ std::span<size_t> KeyIndex,
+ const CacheRecordPolicy& Policy,
+ OnCacheRecordGetComplete&& OnComplete) override final
{
- if (!m_Options.ReadUpstream)
- {
- return {.Missing = std::vector<size_t>(KeyIndex.begin(), KeyIndex.end())};
- }
+ std::vector<size_t> MissingKeys(KeyIndex.begin(), KeyIndex.end());
- GetUpstreamCacheBatchResult Result;
-
- for (size_t Idx : KeyIndex)
+ if (m_Options.ReadUpstream)
{
- const UpstreamCacheKey CacheKey = {CacheKeys[Idx].Bucket, CacheKeys[Idx].Hash};
-
- GetUpstreamCacheResult CacheResult;
for (auto& Endpoint : m_Endpoints)
{
- if (Endpoint->IsHealthy())
+ if (Endpoint->IsHealthy() && !MissingKeys.empty())
{
- CacheResult = Endpoint->GetCacheRecord(CacheKey, ZenContentType::kCbPackage);
- m_Stats.Add(m_Log, *Endpoint, CacheResult, m_Endpoints);
+ std::vector<size_t> Missing;
- if (CacheResult.Success)
- {
- break;
- }
- }
- }
+ auto EndpointResult =
+ Endpoint->GetCacheRecords(CacheKeys, MissingKeys, Policy, [&](CacheRecordGetCompleteParams&& Params) {
+ if (Params.Record)
+ {
+ OnComplete(std::forward<CacheRecordGetCompleteParams>(Params));
+ }
+ else
+ {
+ Missing.push_back(Params.KeyIndex);
+ }
+ });
- if (CacheResult.Success)
- {
- OnComplete(Idx, CacheResult.Value);
- }
- else
- {
- Result.Missing.push_back(Idx);
+ MissingKeys = std::move(Missing);
+ }
}
}
- return Result;
- }
-
- virtual GetUpstreamCacheBatchResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests,
- std::span<size_t> ChunkIndex,
- OnCacheGetComplete OnComplete) override final
- {
- if (!m_Options.ReadUpstream)
+ for (size_t Index : MissingKeys)
{
- return {.Missing = std::vector<size_t>(ChunkIndex.begin(), ChunkIndex.end())};
+ OnComplete({.CacheKey = CacheKeys[Index], .KeyIndex = Index, .Record = CbObjectView(), .Package = CbPackage()});
}
+ }
- GetUpstreamCacheBatchResult Result;
+ virtual void GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests,
+ std::span<size_t> RequestIndex,
+ OnCachePayloadGetComplete&& OnComplete) override final
+ {
+ std::vector<size_t> MissingPayloads(RequestIndex.begin(), RequestIndex.end());
- for (size_t Idx : ChunkIndex)
+ if (m_Options.ReadUpstream)
{
- const CacheChunkRequest& Chunk = CacheChunkRequests[Idx];
- UpstreamPayloadKey PayloadKey{{Chunk.Key.Bucket, Chunk.Key.Hash}, Chunk.ChunkId};
-
- GetUpstreamCacheResult CacheResult;
for (auto& Endpoint : m_Endpoints)
{
- if (Endpoint->IsHealthy())
+ if (Endpoint->IsHealthy() && !MissingPayloads.empty())
{
- CacheResult = Endpoint->GetCachePayload(PayloadKey);
- m_Stats.Add(m_Log, *Endpoint, CacheResult, m_Endpoints);
+ std::vector<size_t> Missing;
- if (CacheResult.Success)
- {
- break;
- }
- }
- }
+ auto EndpointResult =
+ Endpoint->GetCachePayloads(CacheChunkRequests, MissingPayloads, [&](CachePayloadGetCompleteParams&& Params) {
+ if (Params.Payload)
+ {
+ OnComplete(std::forward<CachePayloadGetCompleteParams>(Params));
+ }
+ else
+ {
+ Missing.push_back(Params.RequestIndex);
+ }
+ });
- if (CacheResult.Success)
- {
- OnComplete(Idx, CacheResult.Value);
- }
- else
- {
- Result.Missing.push_back(Idx);
+ MissingPayloads = std::move(Missing);
+ }
}
}
- return Result;
+ for (size_t Index : MissingPayloads)
+ {
+ OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = IoBuffer()});
+ }
}
- virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override
+ virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) override
{
if (m_Options.ReadUpstream)
{
@@ -875,7 +1081,7 @@ public:
{
if (Endpoint->IsHealthy())
{
- const GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey);
+ const GetUpstreamCacheResult Result = Endpoint->GetCachePayload(CacheKey, PayloadId);
m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints);
if (Result.Success)
diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h
index a7bae302d..67bb73b4d 100644
--- a/zenserver/upstream/upstreamcache.h
+++ b/zenserver/upstream/upstreamcache.h
@@ -15,27 +15,17 @@
namespace zen {
+class CbObjectView;
+class CbPackage;
class CbObjectWriter;
class CidStore;
class ZenCacheStore;
struct CloudCacheClientOptions;
-struct UpstreamCacheKey
-{
- std::string Bucket;
- IoHash Hash;
-};
-
-struct UpstreamPayloadKey
-{
- UpstreamCacheKey CacheKey;
- IoHash PayloadId;
-};
-
struct UpstreamCacheRecord
{
ZenContentType Type = ZenContentType::kBinary;
- UpstreamCacheKey CacheKey;
+ CacheKey CacheKey;
std::vector<IoHash> PayloadIds;
};
@@ -98,6 +88,25 @@ struct UpstreamEndpointStats
using OnCacheGetComplete = std::function<void(size_t, IoBuffer)>;
+struct CacheRecordGetCompleteParams
+{
+ const CacheKey& CacheKey;
+ size_t KeyIndex = ~size_t(0);
+ const CbObjectView& Record;
+ const CbPackage& Package;
+};
+
+using OnCacheRecordGetComplete = std::function<void(CacheRecordGetCompleteParams&&)>;
+
+struct CachePayloadGetCompleteParams
+{
+ const CacheChunkRequest& Request;
+ size_t RequestIndex{~size_t(0)};
+ IoBuffer Payload;
+};
+
+using OnCachePayloadGetComplete = std::function<void(CachePayloadGetCompleteParams&&)>;
+
/**
* The upstream endpont is responsible for handling upload/downloading of cache records.
*/
@@ -114,9 +123,18 @@ public:
virtual std::string_view DisplayName() const = 0;
- virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) = 0;
+ virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) = 0;
+
+ virtual GetUpstreamCacheBatchResult GetCacheRecords(std::span<CacheKey> CacheKeys,
+ std::span<size_t> KeyIndex,
+ const CacheRecordPolicy& Policy,
+ OnCacheRecordGetComplete&& OnComplete) = 0;
- virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) = 0;
+ virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) = 0;
+
+ virtual GetUpstreamCacheBatchResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests,
+ std::span<size_t> RequestIndex,
+ OnCachePayloadGetComplete&& OnComplete) = 0;
virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord,
IoBuffer RecordValue,
@@ -137,17 +155,18 @@ public:
virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) = 0;
- virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) = 0;
+ virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) = 0;
- virtual GetUpstreamCacheBatchResult GetCacheRecords(std::span<CacheKey> CacheKeys,
- std::span<size_t> KeyIndex,
- OnCacheGetComplete OnComplete) = 0;
+ virtual void GetCacheRecords(std::span<CacheKey> CacheKeys,
+ std::span<size_t> KeyIndex,
+ const CacheRecordPolicy& RecordPolicy,
+ OnCacheRecordGetComplete&& OnComplete) = 0;
- virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) = 0;
+ virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) = 0;
- virtual GetUpstreamCacheBatchResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests,
- std::span<size_t> RequestIndex,
- OnCacheGetComplete OnComplete) = 0;
+ virtual void GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests,
+ std::span<size_t> RequestIndex,
+ OnCachePayloadGetComplete&& OnComplete) = 0;
struct EnqueueResult
{
diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp
index 14333f45a..d11058180 100644
--- a/zenserver/upstream/zen.cpp
+++ b/zenserver/upstream/zen.cpp
@@ -499,4 +499,33 @@ ZenStructuredCacheSession::PutCachePayload(std::string_view BucketId, const IoHa
.Success = (Response.status_code == 200 || Response.status_code == 201)};
}
+ZenCacheResult
+ZenStructuredCacheSession::InvokeRpc(const CbObjectView& Request)
+{
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_Client.ServiceUrl() << "/z$/$batch";
+
+ BinaryWriter Body;
+ Request.CopyTo(Body);
+
+ cpr::Session& Session = m_SessionState->Session;
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cbobject"}, {"Accept", "application/x-ue-cbpkg"}});
+ Session.SetBody(cpr::Body{reinterpret_cast<const char*>(Body.GetData()), Body.GetSize()});
+
+ cpr::Response Response = Session.Post();
+ ZEN_DEBUG("POST {}", Response);
+
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
+ }
+
+ const bool Success = Response.status_code == 200;
+ const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
+
+ return {.Response = std::move(Buffer), .Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success};
+}
+
} // namespace zen
diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h
index 12e46bd8d..5efe19094 100644
--- a/zenserver/upstream/zen.h
+++ b/zenserver/upstream/zen.h
@@ -28,6 +28,8 @@ class logger;
namespace zen {
class CbObjectWriter;
+class CbObjectView;
+class CbPackage;
class ZenStructuredCacheClient;
/** Zen mesh tracker
@@ -116,6 +118,7 @@ public:
ZenCacheResult GetCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId);
ZenCacheResult PutCacheRecord(std::string_view BucketId, const IoHash& Key, IoBuffer Value, ZenContentType Type);
ZenCacheResult PutCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId, IoBuffer Payload);
+ ZenCacheResult InvokeRpc(const CbObjectView& Request);
private:
inline spdlog::logger& Log() { return m_Log; }