aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache
diff options
context:
space:
mode:
authorPer Larsson <[email protected]>2021-11-11 11:19:17 +0100
committerPer Larsson <[email protected]>2021-11-11 11:19:17 +0100
commit2c0e2ab5de21b13dcd25758ca3b96af889db7137 (patch)
tree7ea6325cbbd7b30b996635522975c37b144664f5 /zenserver/cache
parentHonor cache policy. (diff)
downloadzen-2c0e2ab5de21b13dcd25758ca3b96af889db7137.tar.xz
zen-2c0e2ab5de21b13dcd25758ca3b96af889db7137.zip
Added batch API to upstream endpoints.
Diffstat (limited to 'zenserver/cache')
-rw-r--r--zenserver/cache/cachekey.cpp21
-rw-r--r--zenserver/cache/cachekey.h4
-rw-r--r--zenserver/cache/structuredcache.cpp93
3 files changed, 63 insertions, 55 deletions
diff --git a/zenserver/cache/cachekey.cpp b/zenserver/cache/cachekey.cpp
index eca2d95d5..2ead9ac58 100644
--- a/zenserver/cache/cachekey.cpp
+++ b/zenserver/cache/cachekey.cpp
@@ -3,6 +3,7 @@
#include "cachekey.h"
#include <zencore/compactbinary.h>
+#include <zencore/compactbinarybuilder.h>
#include <zencore/string.h>
namespace zen {
@@ -113,7 +114,7 @@ CacheRecordPolicy::GetPayloadPolicy(const Oid& PayloadId) const
}
bool
-CacheRecordPolicy::FromCompactBinary(CbObjectView RecordPolicyObject, CacheRecordPolicy& OutRecordPolicy)
+CacheRecordPolicy::Load(CbObjectView RecordPolicyObject, CacheRecordPolicy& OutRecordPolicy)
{
using namespace std::literals;
@@ -139,6 +140,24 @@ CacheRecordPolicy::FromCompactBinary(CbObjectView RecordPolicyObject, CacheRecor
return true;
}
+void
+CacheRecordPolicy::Save(const CacheRecordPolicy& Policy, CbWriter& Writer)
+{
+ Writer << "RecordPolicy"sv << static_cast<uint32_t>(Policy.GetRecordPolicy());
+ Writer << "DefaultPayloadPolicy"sv << static_cast<uint32_t>(Policy.GetDefaultPayloadPolicy());
+
+ if (!Policy.m_PayloadPolicies.empty())
+ {
+ Writer.BeginArray("PayloadPolicies"sv);
+ for (const auto& Kv : Policy.m_PayloadPolicies)
+ {
+ Writer.AddObjectId("Id"sv, Kv.first);
+ Writer << "Policy"sv << static_cast<uint32_t>(Kv.second);
+ }
+ Writer.EndArray();
+ }
+}
+
const CacheKey CacheKey::Empty = CacheKey{.Bucket = std::string(), .Hash = IoHash()};
} // namespace zen
diff --git a/zenserver/cache/cachekey.h b/zenserver/cache/cachekey.h
index 6ce5d3aab..c32f7ed87 100644
--- a/zenserver/cache/cachekey.h
+++ b/zenserver/cache/cachekey.h
@@ -12,6 +12,7 @@
namespace zen {
class CbObjectView;
+class CbWriter;
enum class CachePolicy : uint8_t
{
@@ -50,7 +51,8 @@ public:
CachePolicy GetPayloadPolicy(const Oid& PayloadId) const;
CachePolicy GetDefaultPayloadPolicy() const { return m_DefaultPayloadPolicy; }
- static bool FromCompactBinary(CbObjectView RecordPolicyObject, CacheRecordPolicy& OutRecordPolicy);
+ static bool Load(CbObjectView RecordPolicyObject, CacheRecordPolicy& OutRecordPolicy);
+ static void Save(const CacheRecordPolicy& Policy, CbWriter& Writer);
private:
using PayloadPolicyMap = std::unordered_map<Oid, CachePolicy, Oid::Hasher>;
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index de2fcb27c..721942cc8 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -618,7 +618,7 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques
if (QueryUpstream)
{
- if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({{Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId});
+ if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId);
UpstreamResult.Success)
{
if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value)))
@@ -830,7 +830,7 @@ HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& R
CbObjectView Params = BatchRequest["Params"sv].AsObjectView();
CacheRecordPolicy Policy;
- CacheRecordPolicy::FromCompactBinary(Params["Policy"sv].AsObjectView(), Policy);
+ CacheRecordPolicy::Load(Params["Policy"sv].AsObjectView(), Policy);
const bool SkipAttachments = (Policy.GetRecordPolicy() & CachePolicy::SkipAttachments) == CachePolicy::SkipAttachments;
const bool QueryRemote = m_UpstreamCache && ((Policy.GetRecordPolicy() & CachePolicy::QueryRemote) == CachePolicy::QueryRemote);
@@ -899,18 +899,12 @@ HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& R
if (!UpstreamRequests.empty() && m_UpstreamCache)
{
- auto UpstreamResult = m_UpstreamCache->GetCacheRecords(
- CacheKeys,
- UpstreamRequests,
- [this, &CacheKeys, &CacheValues, &Payloads, SkipAttachments](size_t KeyIndex, IoBuffer UpstreamValue) {
- const CacheKey& Key = CacheKeys[KeyIndex];
- CbPackage UpstreamPackage;
- if (UpstreamValue && UpstreamPackage.TryLoad(UpstreamValue))
+ const auto OnCacheRecordGetComplete =
+ [this, &CacheKeys, &CacheValues, &Payloads, &Missing, SkipAttachments](CacheRecordGetCompleteParams&& Params) {
+ if (Params.Record)
{
- CbObjectView CacheRecord = UpstreamPackage.GetObject();
-
- CacheRecord.IterateAttachments([&](CbFieldView AttachmentHash) {
- if (const CbAttachment* Attachment = UpstreamPackage.FindAttachment(AttachmentHash.AsHash()))
+ Params.Record.IterateAttachments([&](CbFieldView AttachmentHash) {
+ if (const CbAttachment* Attachment = Params.Package.FindAttachment(AttachmentHash.AsHash()))
{
if (CompressedBuffer Chunk = Attachment->AsCompressedBinary())
{
@@ -925,20 +919,21 @@ HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& R
});
ZEN_DEBUG("HIT - '{}/{}' {} '{}' (UPSTREAM)",
- Key.Bucket,
- Key.Hash,
- NiceBytes(UpstreamValue.Size()),
- ToString(UpstreamValue.GetContentType()));
+ Params.CacheKey.Bucket,
+ Params.CacheKey.Hash,
+ NiceBytes(Params.Record.GetView().GetSize()),
+ ToString(HttpContentType::kCbObject));
- CacheValues[KeyIndex] = IoBufferBuilder::MakeCloneFromMemory(CacheRecord.GetView());
+ CacheValues[Params.KeyIndex] = IoBufferBuilder::MakeCloneFromMemory(Params.Record.GetView());
m_CacheStats.UpstreamHitCount++;
}
- });
+ else
+ {
+ Missing.push_back(Params.KeyIndex);
+ }
+ };
- for (size_t MissingUpstream : UpstreamResult.Missing)
- {
- Missing.push_back(MissingUpstream);
- }
+ m_UpstreamCache->GetCacheRecords(CacheKeys, UpstreamRequests, Policy, std::move(OnCacheRecordGetComplete));
}
for (size_t KeyIndex : Missing)
@@ -1104,40 +1099,32 @@ HttpStructuredCacheService::HandleBatchGetCachePayloads(zen::HttpServerRequest&
if (!UpstreamRequests.empty() && m_UpstreamCache)
{
- auto UpstreamResult = m_UpstreamCache->GetCachePayloads(
- ChunkRequests,
- UpstreamRequests,
- [this, &ChunkRequests, &Chunks](size_t ChunkIndex, IoBuffer UpstreamValue) {
- const CacheChunkRequest& ChunkRequest = ChunkRequests[ChunkIndex];
-
- if (CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamValue)))
- {
- auto InsertResult = m_CidStore.AddChunk(CompressedChunk);
- IoBuffer Chunk = CompressedChunk.GetCompressed().Flatten().AsIoBuffer();
+ const auto OnCachePayloadGetComplete = [this, &ChunkRequests, &Chunks, &Missing](CachePayloadGetCompleteParams&& Params) {
+ if (Params.Payload)
+ {
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Payload));
+ auto InsertResult = m_CidStore.AddChunk(Compressed);
- ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})",
- ChunkRequest.Key.Bucket,
- ChunkRequest.Key.Hash,
- ChunkRequest.ChunkId,
- NiceBytes(Chunk.GetSize()),
- ToString(Chunk.GetContentType()),
- "UPSTREAM");
+ ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})",
+ Params.Request.Key.Bucket,
+ Params.Request.Key.Hash,
+ Params.Request.ChunkId,
+ NiceBytes(Params.Payload.GetSize()),
+ ToString(Params.Payload.GetContentType()),
+ "UPSTREAM");
- Chunks[ChunkIndex] = Chunk;
+ Chunks[Params.RequestIndex] = std::move(Params.Payload);
- m_CacheStats.HitCount++;
- m_CacheStats.UpstreamHitCount++;
- }
- else
- {
- ZEN_WARN("got uncompressed upstream cache payload");
- }
- });
+ m_CacheStats.HitCount++;
+ m_CacheStats.UpstreamHitCount++;
+ }
+ else
+ {
+ Missing.push_back(Params.RequestIndex);
+ }
+ };
- for (size_t MissingUpstream : UpstreamResult.Missing)
- {
- Missing.push_back(MissingUpstream);
- }
+ m_UpstreamCache->GetCachePayloads(ChunkRequests, UpstreamRequests, std::move(OnCachePayloadGetComplete));
}
for (size_t RequestIndex : Missing)