aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache/structuredcache.cpp
diff options
context:
space:
mode:
authorPer Larsson <[email protected]>2021-11-12 11:08:53 +0100
committerPer Larsson <[email protected]>2021-11-12 11:08:53 +0100
commit3faf0b57c625152a8facfca1c4995bd9edc95707 (patch)
tree0ae1ff078147091bafb7f5f4799943cebd0a7f50 /zenserver/cache/structuredcache.cpp
parentChanged from batch to RPC. (diff)
downloadzen-3faf0b57c625152a8facfca1c4995bd9edc95707.tar.xz
zen-3faf0b57c625152a8facfca1c4995bd9edc95707.zip
Movec cache utility types to zenutil and fixed unit tests.
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
-rw-r--r--zenserver/cache/structuredcache.cpp49
1 files changed, 26 insertions, 23 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index e78e583aa..1b6406562 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -11,8 +11,9 @@
#include <zencore/timer.h>
#include <zenhttp/httpserver.h>
#include <zenstore/CAS.h>
+#include <zenutil/cache/cache.h>
-#include "cachekey.h"
+//#include "cachekey.h"
#include "monitoring/httpstats.h"
#include "structuredcache.h"
#include "structuredcachestore.h"
@@ -796,15 +797,15 @@ HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request)
}
Request.WriteResponseAsync(
- [this, BatchRequest = zen::LoadCompactBinaryObject(Request.ReadPayload())](HttpServerRequest& AsyncRequest) {
- const std::string_view Method = BatchRequest["Method"sv].AsString();
+ [this, RpcRequest = zen::LoadCompactBinaryObject(Request.ReadPayload())](HttpServerRequest& AsyncRequest) {
+ const std::string_view Method = RpcRequest["Method"sv].AsString();
if (Method == "GetCacheRecords"sv)
{
- HandleRpcGetCacheRecords(AsyncRequest, BatchRequest);
+ HandleRpcGetCacheRecords(AsyncRequest, RpcRequest);
}
else if (Method == "GetCachePayloads"sv)
{
- HandleRpcGetCachePayloads(AsyncRequest, BatchRequest);
+ HandleRpcGetCachePayloads(AsyncRequest, RpcRequest);
}
else
{
@@ -820,18 +821,18 @@ HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request)
}
void
-HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView BatchRequest)
+HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView RpcRequest)
{
using namespace fmt::literals;
- CbPackage BatchResponse;
+ CbPackage RpcResponse;
CacheRecordPolicy Policy;
- CbObjectView Params = BatchRequest["Params"sv].AsObjectView();
+ CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
std::vector<CacheKey> CacheKeys;
std::vector<IoBuffer> CacheValues;
std::vector<size_t> UpstreamRequests;
- ZEN_ASSERT(BatchRequest["Method"sv].AsString() == "GetCacheRecords"sv);
+ ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheRecords"sv);
CacheRecordPolicy::Load(Params["Policy"sv].AsObjectView(), Policy);
@@ -860,10 +861,10 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req
if (!SkipAttachments)
{
- CacheRecord.IterateAttachments([this, &BatchResponse](CbFieldView AttachmentHash) {
+ CacheRecord.IterateAttachments([this, &RpcResponse](CbFieldView AttachmentHash) {
if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
{
- BatchResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk))));
+ RpcResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk))));
}
});
}
@@ -893,7 +894,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req
if (!UpstreamRequests.empty() && m_UpstreamCache)
{
const auto OnCacheRecordGetComplete =
- [this, &CacheKeys, &CacheValues, &BatchResponse, SkipAttachments](CacheRecordGetCompleteParams&& Params) {
+ [this, &CacheKeys, &CacheValues, &RpcResponse, SkipAttachments](CacheRecordGetCompleteParams&& Params) {
if (Params.Record)
{
Params.Record.IterateAttachments([&](CbFieldView AttachmentHash) {
@@ -905,7 +906,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req
if (!SkipAttachments)
{
- BatchResponse.AddAttachment(CbAttachment(Compressed));
+ RpcResponse.AddAttachment(CbAttachment(Compressed));
}
}
}
@@ -917,6 +918,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req
NiceBytes(Params.Record.GetView().GetSize()),
ToString(HttpContentType::kCbObject));
+ ZEN_ASSERT(Params.KeyIndex < CacheValues.size());
CacheValues[Params.KeyIndex] = IoBufferBuilder::MakeCloneFromMemory(Params.Record.GetView());
m_CacheStats.HitCount++;
m_CacheStats.UpstreamHitCount++;
@@ -948,10 +950,10 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req
}
ResponseObject.EndArray();
- BatchResponse.SetObject(ResponseObject.Save());
+ RpcResponse.SetObject(ResponseObject.Save());
BinaryWriter MemStream;
- BatchResponse.Save(MemStream);
+ RpcResponse.Save(MemStream);
Request.WriteResponse(HttpResponseCode::OK,
HttpContentType::kCbPackage,
@@ -959,16 +961,16 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req
}
void
-HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Request, CbObjectView BatchRequest)
+HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Request, CbObjectView RpcRequest)
{
using namespace fmt::literals;
- ZEN_ASSERT(BatchRequest["Method"sv].AsString() == "GetCachePayloads"sv);
+ ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCachePayloads"sv);
std::vector<CacheChunkRequest> ChunkRequests;
std::vector<size_t> UpstreamRequests;
std::vector<IoBuffer> Chunks;
- CbObjectView Params = BatchRequest["Params"sv].AsObjectView();
+ CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
for (CbFieldView RequestView : Params["ChunkRequests"sv])
{
@@ -1029,7 +1031,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re
CurrentKey = ChunkRequest.Key;
ZenCacheValue CacheValue;
- if (m_CacheStore.Get(ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, CacheValue))
+ if (m_CacheStore.Get(CurrentKey.Bucket, CurrentKey.Hash, CacheValue))
{
CurrentRecordBuffer = CacheValue.Value;
}
@@ -1094,6 +1096,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re
NiceBytes(Params.Payload.GetSize()),
"UPSTREAM");
+ ZEN_ASSERT(Params.RequestIndex < Chunks.size());
Chunks[Params.RequestIndex] = std::move(Params.Payload);
m_CacheStats.HitCount++;
@@ -1109,7 +1112,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re
m_UpstreamCache->GetCachePayloads(ChunkRequests, UpstreamRequests, std::move(OnCachePayloadGetComplete));
}
- CbPackage BatchResponse;
+ CbPackage RpcResponse;
CbObjectWriter ResponseObject;
ResponseObject.BeginArray("Result"sv);
@@ -1119,7 +1122,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re
if (Chunks[ChunkIndex])
{
ResponseObject << ChunkRequests[ChunkIndex].ChunkId;
- BatchResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunks[ChunkIndex])))));
+ RpcResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunks[ChunkIndex])))));
}
else
{
@@ -1128,10 +1131,10 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re
}
ResponseObject.EndArray();
- BatchResponse.SetObject(ResponseObject.Save());
+ RpcResponse.SetObject(ResponseObject.Save());
BinaryWriter MemStream;
- BatchResponse.Save(MemStream);
+ RpcResponse.Save(MemStream);
Request.WriteResponse(HttpResponseCode::OK,
HttpContentType::kCbPackage,