aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache/structuredcache.cpp
diff options
context:
space:
mode:
authorPer Larsson <[email protected]>2021-11-11 11:19:17 +0100
committerPer Larsson <[email protected]>2021-11-11 11:19:17 +0100
commit2c0e2ab5de21b13dcd25758ca3b96af889db7137 (patch)
tree7ea6325cbbd7b30b996635522975c37b144664f5 /zenserver/cache/structuredcache.cpp
parentHonor cache policy. (diff)
downloadzen-2c0e2ab5de21b13dcd25758ca3b96af889db7137.tar.xz
zen-2c0e2ab5de21b13dcd25758ca3b96af889db7137.zip
Added batch API to upstream endpoints.
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
-rw-r--r--zenserver/cache/structuredcache.cpp93
1 files changed, 40 insertions, 53 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index de2fcb27c..721942cc8 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -618,7 +618,7 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques
if (QueryUpstream)
{
- if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({{Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId});
+ if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId);
UpstreamResult.Success)
{
if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value)))
@@ -830,7 +830,7 @@ HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& R
CbObjectView Params = BatchRequest["Params"sv].AsObjectView();
CacheRecordPolicy Policy;
- CacheRecordPolicy::FromCompactBinary(Params["Policy"sv].AsObjectView(), Policy);
+ CacheRecordPolicy::Load(Params["Policy"sv].AsObjectView(), Policy);
const bool SkipAttachments = (Policy.GetRecordPolicy() & CachePolicy::SkipAttachments) == CachePolicy::SkipAttachments;
const bool QueryRemote = m_UpstreamCache && ((Policy.GetRecordPolicy() & CachePolicy::QueryRemote) == CachePolicy::QueryRemote);
@@ -899,18 +899,12 @@ HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& R
if (!UpstreamRequests.empty() && m_UpstreamCache)
{
- auto UpstreamResult = m_UpstreamCache->GetCacheRecords(
- CacheKeys,
- UpstreamRequests,
- [this, &CacheKeys, &CacheValues, &Payloads, SkipAttachments](size_t KeyIndex, IoBuffer UpstreamValue) {
- const CacheKey& Key = CacheKeys[KeyIndex];
- CbPackage UpstreamPackage;
- if (UpstreamValue && UpstreamPackage.TryLoad(UpstreamValue))
+ const auto OnCacheRecordGetComplete =
+ [this, &CacheKeys, &CacheValues, &Payloads, &Missing, SkipAttachments](CacheRecordGetCompleteParams&& Params) {
+ if (Params.Record)
{
- CbObjectView CacheRecord = UpstreamPackage.GetObject();
-
- CacheRecord.IterateAttachments([&](CbFieldView AttachmentHash) {
- if (const CbAttachment* Attachment = UpstreamPackage.FindAttachment(AttachmentHash.AsHash()))
+ Params.Record.IterateAttachments([&](CbFieldView AttachmentHash) {
+ if (const CbAttachment* Attachment = Params.Package.FindAttachment(AttachmentHash.AsHash()))
{
if (CompressedBuffer Chunk = Attachment->AsCompressedBinary())
{
@@ -925,20 +919,21 @@ HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& R
});
ZEN_DEBUG("HIT - '{}/{}' {} '{}' (UPSTREAM)",
- Key.Bucket,
- Key.Hash,
- NiceBytes(UpstreamValue.Size()),
- ToString(UpstreamValue.GetContentType()));
+ Params.CacheKey.Bucket,
+ Params.CacheKey.Hash,
+ NiceBytes(Params.Record.GetView().GetSize()),
+ ToString(HttpContentType::kCbObject));
- CacheValues[KeyIndex] = IoBufferBuilder::MakeCloneFromMemory(CacheRecord.GetView());
+ CacheValues[Params.KeyIndex] = IoBufferBuilder::MakeCloneFromMemory(Params.Record.GetView());
m_CacheStats.UpstreamHitCount++;
}
- });
+ else
+ {
+ Missing.push_back(Params.KeyIndex);
+ }
+ };
- for (size_t MissingUpstream : UpstreamResult.Missing)
- {
- Missing.push_back(MissingUpstream);
- }
+ m_UpstreamCache->GetCacheRecords(CacheKeys, UpstreamRequests, Policy, std::move(OnCacheRecordGetComplete));
}
for (size_t KeyIndex : Missing)
@@ -1104,40 +1099,32 @@ HttpStructuredCacheService::HandleBatchGetCachePayloads(zen::HttpServerRequest&
if (!UpstreamRequests.empty() && m_UpstreamCache)
{
- auto UpstreamResult = m_UpstreamCache->GetCachePayloads(
- ChunkRequests,
- UpstreamRequests,
- [this, &ChunkRequests, &Chunks](size_t ChunkIndex, IoBuffer UpstreamValue) {
- const CacheChunkRequest& ChunkRequest = ChunkRequests[ChunkIndex];
-
- if (CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamValue)))
- {
- auto InsertResult = m_CidStore.AddChunk(CompressedChunk);
- IoBuffer Chunk = CompressedChunk.GetCompressed().Flatten().AsIoBuffer();
+ const auto OnCachePayloadGetComplete = [this, &ChunkRequests, &Chunks, &Missing](CachePayloadGetCompleteParams&& Params) {
+ if (Params.Payload)
+ {
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Payload));
+ auto InsertResult = m_CidStore.AddChunk(Compressed);
- ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})",
- ChunkRequest.Key.Bucket,
- ChunkRequest.Key.Hash,
- ChunkRequest.ChunkId,
- NiceBytes(Chunk.GetSize()),
- ToString(Chunk.GetContentType()),
- "UPSTREAM");
+ ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})",
+ Params.Request.Key.Bucket,
+ Params.Request.Key.Hash,
+ Params.Request.ChunkId,
+ NiceBytes(Params.Payload.GetSize()),
+ ToString(Params.Payload.GetContentType()),
+ "UPSTREAM");
- Chunks[ChunkIndex] = Chunk;
+ Chunks[Params.RequestIndex] = std::move(Params.Payload);
- m_CacheStats.HitCount++;
- m_CacheStats.UpstreamHitCount++;
- }
- else
- {
- ZEN_WARN("got uncompressed upstream cache payload");
- }
- });
+ m_CacheStats.HitCount++;
+ m_CacheStats.UpstreamHitCount++;
+ }
+ else
+ {
+ Missing.push_back(Params.RequestIndex);
+ }
+ };
- for (size_t MissingUpstream : UpstreamResult.Missing)
- {
- Missing.push_back(MissingUpstream);
- }
+ m_UpstreamCache->GetCachePayloads(ChunkRequests, UpstreamRequests, std::move(OnCachePayloadGetComplete));
}
for (size_t RequestIndex : Missing)