aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache/structuredcache.cpp
diff options
context:
space:
mode:
authorPer Larsson <[email protected]>2021-11-09 13:20:00 +0100
committerPer Larsson <[email protected]>2021-11-09 13:20:00 +0100
commite0d54396fa3ba0f5466a4ea1f2810721c18fa55f (patch)
tree4ab63a83dac7e1e9245d62be1918d12f4a55ae8d /zenserver/cache/structuredcache.cpp
parentAdded batched get chunk(s). (diff)
downloadzen-e0d54396fa3ba0f5466a4ea1f2810721c18fa55f.tar.xz
zen-e0d54396fa3ba0f5466a4ea1f2810721c18fa55f.zip
Sort cache keys when resolving payload ID's.
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
-rw-r--r--zenserver/cache/structuredcache.cpp156
1 files changed, 117 insertions, 39 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index a1b0e1549..decad2f04 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -809,7 +809,7 @@ HttpStructuredCacheService::HandleBatchRequest(zen::HttpServerRequest& Request,
}
else if (Method == "get-cache-chunks"sv)
{
- HandleBatchGetCacheChunks(Request, BatchRequest, Policy);
+ HandleBatchGetCachePayloads(Request, BatchRequest, Policy);
}
else
{
@@ -848,6 +848,11 @@ HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& R
CacheKeys.push_back(CacheKey::Create(Query["bucket"sv].AsString(), Query["hash"sv].AsHash()));
}
+ if (CacheKeys.empty())
+ {
+ return Request.WriteResponse(HttpResponseCode::BadRequest);
+ }
+
CacheValues.resize(CacheKeys.size());
for (size_t Idx = 0; const CacheKey& Key : CacheKeys)
@@ -867,7 +872,14 @@ HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& R
});
}
+ ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL)",
+ Key.Bucket,
+ Key.Hash,
+ NiceBytes(CacheValue.Value.Size()),
+ ToString(CacheValue.Value.GetContentType()));
+
CacheValues[Idx] = CacheValue.Value;
+ m_CacheStats.HitCount++;
}
else
{
@@ -882,8 +894,9 @@ HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& R
auto UpstreamResult = m_UpstreamCache->GetCacheRecords(
CacheKeys,
Missing,
- [this, &CacheValues, &Payloads, SkipAttachments](size_t KeyIndex, IoBuffer UpstreamValue) {
- CbPackage UpstreamPackage;
+ [this, &CacheKeys, &CacheValues, &Payloads, SkipAttachments](size_t KeyIndex, IoBuffer UpstreamValue) {
+ const CacheKey& Key = CacheKeys[KeyIndex];
+ CbPackage UpstreamPackage;
if (UpstreamValue && UpstreamPackage.TryLoad(UpstreamValue))
{
CbObjectView CacheRecord = UpstreamPackage.GetObject();
@@ -903,9 +916,25 @@ HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& R
}
});
+ ZEN_DEBUG("HIT - '{}/{}' {} '{}' (UPSTREAM)",
+ Key.Bucket,
+ Key.Hash,
+ NiceBytes(UpstreamValue.Size()),
+ ToString(UpstreamValue.GetContentType()));
+
CacheValues[KeyIndex] = IoBufferBuilder::MakeCloneFromMemory(CacheRecord.GetView());
+ m_CacheStats.UpstreamHitCount++;
}
});
+
+ Missing = std::move(UpstreamResult.Missing);
+ }
+
+ for (size_t Idx : Missing)
+ {
+ const CacheKey& Key = CacheKeys[Idx];
+ ZEN_DEBUG("MISS - '{}/{}'", Key.Bucket, Key.Hash);
+ m_CacheStats.MissCount++;
}
CbObjectWriter BatchResponse;
@@ -942,19 +971,10 @@ HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& R
}
void
-HttpStructuredCacheService::HandleBatchGetCacheChunks(zen::HttpServerRequest& Request, CbObjectView BatchRequest, CachePolicy Policy)
+HttpStructuredCacheService::HandleBatchGetCachePayloads(zen::HttpServerRequest& Request, CbObjectView BatchRequest, CachePolicy Policy)
{
using namespace fmt::literals;
- const bool SkipData = (Policy & CachePolicy::SkipData) == CachePolicy::SkipData;
- const bool SkipAttachments = (Policy & CachePolicy::SkipAttachments) == CachePolicy::SkipAttachments;
- const bool QueryUpstream = m_UpstreamCache && ((Policy & CachePolicy::QueryRemote) == CachePolicy::QueryRemote);
-
- const std::string_view Method = BatchRequest["method"sv].AsString();
- ZEN_ASSERT(Method == "get-cache-chunks"sv);
-
- CbObjectView Params = BatchRequest["params"sv].AsObjectView();
-
const auto GetChunkIdFromPayloadId = [](CbObjectView Record, const Oid& PayloadId) -> IoHash {
if (CbObjectView ValueObject = Record["Value"].AsObjectView())
{
@@ -979,68 +999,126 @@ HttpStructuredCacheService::HandleBatchGetCacheChunks(zen::HttpServerRequest& Re
return IoHash::Zero;
};
- std::vector<CacheChunk> CacheChunks;
- std::vector<CachePolicy> ChunkPolicies;
- std::vector<size_t> Missing;
+ const bool SkipData = (Policy & CachePolicy::SkipData) == CachePolicy::SkipData;
+ const bool SkipAttachments = (Policy & CachePolicy::SkipAttachments) == CachePolicy::SkipAttachments;
+ const bool QueryUpstream = m_UpstreamCache && ((Policy & CachePolicy::QueryRemote) == CachePolicy::QueryRemote);
+
+ const std::string_view Method = BatchRequest["method"sv].AsString();
+ ZEN_ASSERT(Method == "get-cache-chunks"sv);
+
+ CbObjectView Params = BatchRequest["params"sv].AsObjectView();
+
+ std::vector<CacheChunkRequest> ChunkRequests;
+ std::vector<size_t> Missing;
- for (CbFieldView ChunkView : Params["cachechunks"sv])
+ for (CbFieldView ChunkView : Params["chunkrequests"sv])
{
CbObjectView Chunk = ChunkView.AsObjectView();
CbObjectView CacheKeyObject = Chunk["key"sv].AsObjectView();
const CacheKey Key = CacheKey::Create(CacheKeyObject["bucket"sv].AsString(), CacheKeyObject["hash"sv].AsHash());
- const Oid PayloadId = Chunk["id"sv].AsObjectId();
+ const IoHash ChunkId = IoHash::Zero;
+ const Oid PayloadId = Chunk["payloadid"sv].AsObjectId();
const uint64_t RawOffset = Chunk["rawoffset"sv].AsUInt64();
const uint64_t RawSize = Chunk["rawsize"sv].AsUInt64();
const uint32_t ChunkPolicy = Chunk["policy"sv].AsUInt32();
- IoHash ChunkId = IoHash::Zero;
+ ChunkRequests.emplace_back(Key, ChunkId, PayloadId, RawOffset, RawSize, static_cast<CachePolicy>(ChunkPolicy));
+ }
- ZenCacheValue CacheValue;
- if (m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue))
+ std::stable_sort(ChunkRequests.begin(), ChunkRequests.end());
+
+ CacheKey CurrentKey = CacheKey::Empty;
+ IoBuffer CurrentRecordBuffer;
+
+ for (CacheChunkRequest& ChunkRequest : ChunkRequests)
+ {
+ if (ChunkRequest.Key != CurrentKey)
{
- ChunkId = GetChunkIdFromPayloadId(CbObjectView(CacheValue.Value.Data()), PayloadId);
+ CurrentKey = ChunkRequest.Key;
+
+ ZenCacheValue CacheValue;
+ if (m_CacheStore.Get(ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, CacheValue))
+ {
+ CurrentRecordBuffer = CacheValue.Value;
+ }
}
- CacheChunks.emplace_back(Key, ChunkId, RawOffset, RawSize);
- ChunkPolicies.emplace_back(static_cast<CachePolicy>(ChunkPolicy));
+ if (CurrentRecordBuffer)
+ {
+ ChunkRequest.ChunkId = GetChunkIdFromPayloadId(CbObjectView(CurrentRecordBuffer.GetData()), ChunkRequest.PayloadId);
+ }
}
- if (CacheChunks.empty())
+ if (ChunkRequests.empty())
{
return Request.WriteResponse(HttpResponseCode::BadRequest);
}
std::vector<IoBuffer> Chunks;
- Chunks.resize(CacheChunks.size());
+ Chunks.resize(ChunkRequests.size());
- for (size_t Idx = 0; CacheChunk & CacheChunk : CacheChunks)
+ for (size_t Idx = 0; const CacheChunkRequest& ChunkRequest : ChunkRequests)
{
- if (IoBuffer Chunk = m_CidStore.FindChunkByCid(CacheChunk.Id))
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(ChunkRequest.ChunkId))
{
ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})",
- CacheChunk.Key.Bucket,
- CacheChunk.Key.Hash,
- CacheChunk.Id,
+ ChunkRequest.Key.Bucket,
+ ChunkRequest.Key.Hash,
+ ChunkRequest.ChunkId,
NiceBytes(Chunk.Size()),
ToString(Chunk.GetContentType()),
"LOCAL");
Chunks[Idx] = Chunk;
+ m_CacheStats.HitCount++;
}
else
{
- ZEN_DEBUG("MISS - '{}/{}/{}' '{}'",
- CacheChunk.Key.Bucket,
- CacheChunk.Key.Hash,
- CacheChunk.Id,
- ToString(HttpContentType::kCompressedBinary));
-
- CacheChunk.Id = IoHash::Zero;
Missing.push_back(Idx);
}
++Idx;
}
+ if (!Missing.empty() && QueryUpstream)
+ {
+ auto UpstreamResult = m_UpstreamCache->GetCachePayloads(
+ ChunkRequests,
+ Missing,
+ [this, &ChunkRequests, &Chunks](size_t ChunkIndex, IoBuffer UpstreamValue) {
+ const CacheChunkRequest& ChunkRequest = ChunkRequests[ChunkIndex];
+
+ if (CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamValue)))
+ {
+ auto InsertResult = m_CidStore.AddChunk(CompressedChunk);
+ IoBuffer Chunk = CompressedChunk.GetCompressed().Flatten().AsIoBuffer();
+
+ ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})",
+ ChunkRequest.Key.Bucket,
+ ChunkRequest.Key.Hash,
+ ChunkRequest.ChunkId,
+ NiceBytes(Chunk.GetSize()),
+ ToString(Chunk.GetContentType()),
+ "UPSTREAM");
+
+ Chunks[ChunkIndex] = Chunk;
+ m_CacheStats.UpstreamHitCount++;
+ }
+ else
+ {
+ ZEN_WARN("got uncompressed upstream cache payload");
+ }
+ });
+
+ Missing = std::move(UpstreamResult.Missing);
+ }
+
+ for (size_t Idx : Missing)
+ {
+ const CacheChunkRequest& ChunkRequest = ChunkRequests[Idx];
+ ZEN_DEBUG("MISS - '{}/{}/{}'", ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, ChunkRequest.ChunkId);
+ m_CacheStats.MissCount++;
+ }
+
CbPackage Package;
CbObjectWriter BatchResponse;
@@ -1050,7 +1128,7 @@ HttpStructuredCacheService::HandleBatchGetCacheChunks(zen::HttpServerRequest& Re
{
if (Chunks[Idx])
{
- BatchResponse << CacheChunks[Idx].Id;
+ BatchResponse << ChunkRequests[Idx].ChunkId;
Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunks[Idx])))));
}
else