diff options
| author | Per Larsson <[email protected]> | 2021-11-11 11:19:17 +0100 |
|---|---|---|
| committer | Per Larsson <[email protected]> | 2021-11-11 11:19:17 +0100 |
| commit | 2c0e2ab5de21b13dcd25758ca3b96af889db7137 (patch) | |
| tree | 7ea6325cbbd7b30b996635522975c37b144664f5 /zenserver/cache | |
| parent | Honor cache policy. (diff) | |
| download | zen-2c0e2ab5de21b13dcd25758ca3b96af889db7137.tar.xz zen-2c0e2ab5de21b13dcd25758ca3b96af889db7137.zip | |
Added batch API to upstream endpoints.
Diffstat (limited to 'zenserver/cache')
| -rw-r--r-- | zenserver/cache/cachekey.cpp | 21 | ||||
| -rw-r--r-- | zenserver/cache/cachekey.h | 4 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 93 |
3 files changed, 63 insertions, 55 deletions
diff --git a/zenserver/cache/cachekey.cpp b/zenserver/cache/cachekey.cpp index eca2d95d5..2ead9ac58 100644 --- a/zenserver/cache/cachekey.cpp +++ b/zenserver/cache/cachekey.cpp @@ -3,6 +3,7 @@ #include "cachekey.h" #include <zencore/compactbinary.h> +#include <zencore/compactbinarybuilder.h> #include <zencore/string.h> namespace zen { @@ -113,7 +114,7 @@ CacheRecordPolicy::GetPayloadPolicy(const Oid& PayloadId) const } bool -CacheRecordPolicy::FromCompactBinary(CbObjectView RecordPolicyObject, CacheRecordPolicy& OutRecordPolicy) +CacheRecordPolicy::Load(CbObjectView RecordPolicyObject, CacheRecordPolicy& OutRecordPolicy) { using namespace std::literals; @@ -139,6 +140,24 @@ CacheRecordPolicy::FromCompactBinary(CbObjectView RecordPolicyObject, CacheRecor return true; } +void +CacheRecordPolicy::Save(const CacheRecordPolicy& Policy, CbWriter& Writer) +{ + Writer << "RecordPolicy"sv << static_cast<uint32_t>(Policy.GetRecordPolicy()); + Writer << "DefaultPayloadPolicy"sv << static_cast<uint32_t>(Policy.GetDefaultPayloadPolicy()); + + if (!Policy.m_PayloadPolicies.empty()) + { + Writer.BeginArray("PayloadPolicies"sv); + for (const auto& Kv : Policy.m_PayloadPolicies) + { + Writer.AddObjectId("Id"sv, Kv.first); + Writer << "Policy"sv << static_cast<uint32_t>(Kv.second); + } + Writer.EndArray(); + } +} + const CacheKey CacheKey::Empty = CacheKey{.Bucket = std::string(), .Hash = IoHash()}; } // namespace zen diff --git a/zenserver/cache/cachekey.h b/zenserver/cache/cachekey.h index 6ce5d3aab..c32f7ed87 100644 --- a/zenserver/cache/cachekey.h +++ b/zenserver/cache/cachekey.h @@ -12,6 +12,7 @@ namespace zen { class CbObjectView; +class CbWriter; enum class CachePolicy : uint8_t { @@ -50,7 +51,8 @@ public: CachePolicy GetPayloadPolicy(const Oid& PayloadId) const; CachePolicy GetDefaultPayloadPolicy() const { return m_DefaultPayloadPolicy; } - static bool FromCompactBinary(CbObjectView RecordPolicyObject, CacheRecordPolicy& OutRecordPolicy); + static bool Load(CbObjectView RecordPolicyObject, CacheRecordPolicy& OutRecordPolicy); + static void Save(const CacheRecordPolicy& Policy, CbWriter& Writer); private: using PayloadPolicyMap = std::unordered_map<Oid, CachePolicy, Oid::Hasher>; diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index de2fcb27c..721942cc8 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -618,7 +618,7 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques if (QueryUpstream) { - if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({{Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId}); + if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId); UpstreamResult.Success) { if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) @@ -830,7 +830,7 @@ HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& R CbObjectView Params = BatchRequest["Params"sv].AsObjectView(); CacheRecordPolicy Policy; - CacheRecordPolicy::FromCompactBinary(Params["Policy"sv].AsObjectView(), Policy); + CacheRecordPolicy::Load(Params["Policy"sv].AsObjectView(), Policy); const bool SkipAttachments = (Policy.GetRecordPolicy() & CachePolicy::SkipAttachments) == CachePolicy::SkipAttachments; const bool QueryRemote = m_UpstreamCache && ((Policy.GetRecordPolicy() & CachePolicy::QueryRemote) == CachePolicy::QueryRemote); @@ -899,18 +899,12 @@ HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& R if (!UpstreamRequests.empty() && m_UpstreamCache) { - auto UpstreamResult = m_UpstreamCache->GetCacheRecords( - CacheKeys, - UpstreamRequests, - [this, &CacheKeys, &CacheValues, &Payloads, SkipAttachments](size_t KeyIndex, IoBuffer UpstreamValue) { - const CacheKey& Key = CacheKeys[KeyIndex]; - CbPackage UpstreamPackage; - if (UpstreamValue && UpstreamPackage.TryLoad(UpstreamValue)) + const auto OnCacheRecordGetComplete = + [this, &CacheKeys, &CacheValues, &Payloads, &Missing, SkipAttachments](CacheRecordGetCompleteParams&& Params) { + if (Params.Record) { - CbObjectView CacheRecord = UpstreamPackage.GetObject(); - - CacheRecord.IterateAttachments([&](CbFieldView AttachmentHash) { - if (const CbAttachment* Attachment = UpstreamPackage.FindAttachment(AttachmentHash.AsHash())) + Params.Record.IterateAttachments([&](CbFieldView AttachmentHash) { + if (const CbAttachment* Attachment = Params.Package.FindAttachment(AttachmentHash.AsHash())) { if (CompressedBuffer Chunk = Attachment->AsCompressedBinary()) { @@ -925,20 +919,21 @@ HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& R }); ZEN_DEBUG("HIT - '{}/{}' {} '{}' (UPSTREAM)", - Key.Bucket, - Key.Hash, - NiceBytes(UpstreamValue.Size()), - ToString(UpstreamValue.GetContentType())); + Params.CacheKey.Bucket, + Params.CacheKey.Hash, + NiceBytes(Params.Record.GetView().GetSize()), + ToString(HttpContentType::kCbObject)); - CacheValues[KeyIndex] = IoBufferBuilder::MakeCloneFromMemory(CacheRecord.GetView()); + CacheValues[Params.KeyIndex] = IoBufferBuilder::MakeCloneFromMemory(Params.Record.GetView()); m_CacheStats.UpstreamHitCount++; } - }); + else + { + Missing.push_back(Params.KeyIndex); + } + }; - for (size_t MissingUpstream : UpstreamResult.Missing) - { - Missing.push_back(MissingUpstream); - } + m_UpstreamCache->GetCacheRecords(CacheKeys, UpstreamRequests, Policy, std::move(OnCacheRecordGetComplete)); } for (size_t KeyIndex : Missing) @@ -1104,40 +1099,32 @@ HttpStructuredCacheService::HandleBatchGetCachePayloads(zen::HttpServerRequest& if (!UpstreamRequests.empty() && m_UpstreamCache) { - auto UpstreamResult = m_UpstreamCache->GetCachePayloads( - ChunkRequests, - UpstreamRequests, - [this, &ChunkRequests, &Chunks](size_t ChunkIndex, IoBuffer UpstreamValue) { - const CacheChunkRequest& ChunkRequest = ChunkRequests[ChunkIndex]; - - if (CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamValue))) - { - auto InsertResult = m_CidStore.AddChunk(CompressedChunk); - IoBuffer Chunk = CompressedChunk.GetCompressed().Flatten().AsIoBuffer(); + const auto OnCachePayloadGetComplete = [this, &ChunkRequests, &Chunks, &Missing](CachePayloadGetCompleteParams&& Params) { + if (Params.Payload) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Payload)); + auto InsertResult = m_CidStore.AddChunk(Compressed); - ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})", - ChunkRequest.Key.Bucket, - ChunkRequest.Key.Hash, - ChunkRequest.ChunkId, - NiceBytes(Chunk.GetSize()), - ToString(Chunk.GetContentType()), - "UPSTREAM"); + ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})", + Params.Request.Key.Bucket, + Params.Request.Key.Hash, + Params.Request.ChunkId, + NiceBytes(Params.Payload.GetSize()), + ToString(Params.Payload.GetContentType()), + "UPSTREAM"); - Chunks[ChunkIndex] = Chunk; + Chunks[Params.RequestIndex] = std::move(Params.Payload); - m_CacheStats.HitCount++; - m_CacheStats.UpstreamHitCount++; - } - else - { - ZEN_WARN("got uncompressed upstream cache payload"); - } - }); + m_CacheStats.HitCount++; + m_CacheStats.UpstreamHitCount++; + } + else + { + Missing.push_back(Params.RequestIndex); + } + }; - for (size_t MissingUpstream : UpstreamResult.Missing) - { - Missing.push_back(MissingUpstream); - } + m_UpstreamCache->GetCachePayloads(ChunkRequests, UpstreamRequests, std::move(OnCachePayloadGetComplete)); } for (size_t RequestIndex : Missing) |