diff options
| author | Per Larsson <[email protected]> | 2021-11-12 11:08:53 +0100 |
|---|---|---|
| committer | Per Larsson <[email protected]> | 2021-11-12 11:08:53 +0100 |
| commit | 3faf0b57c625152a8facfca1c4995bd9edc95707 (patch) | |
| tree | 0ae1ff078147091bafb7f5f4799943cebd0a7f50 /zenserver/cache/structuredcache.cpp | |
| parent | Changed from batch to RPC. (diff) | |
| download | zen-3faf0b57c625152a8facfca1c4995bd9edc95707.tar.xz zen-3faf0b57c625152a8facfca1c4995bd9edc95707.zip | |
Movec cache utility types to zenutil and fixed unit tests.
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 49 |
1 files changed, 26 insertions, 23 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index e78e583aa..1b6406562 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -11,8 +11,9 @@ #include <zencore/timer.h> #include <zenhttp/httpserver.h> #include <zenstore/CAS.h> +#include <zenutil/cache/cache.h> -#include "cachekey.h" +//#include "cachekey.h" #include "monitoring/httpstats.h" #include "structuredcache.h" #include "structuredcachestore.h" @@ -796,15 +797,15 @@ HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request) } Request.WriteResponseAsync( - [this, BatchRequest = zen::LoadCompactBinaryObject(Request.ReadPayload())](HttpServerRequest& AsyncRequest) { - const std::string_view Method = BatchRequest["Method"sv].AsString(); + [this, RpcRequest = zen::LoadCompactBinaryObject(Request.ReadPayload())](HttpServerRequest& AsyncRequest) { + const std::string_view Method = RpcRequest["Method"sv].AsString(); if (Method == "GetCacheRecords"sv) { - HandleRpcGetCacheRecords(AsyncRequest, BatchRequest); + HandleRpcGetCacheRecords(AsyncRequest, RpcRequest); } else if (Method == "GetCachePayloads"sv) { - HandleRpcGetCachePayloads(AsyncRequest, BatchRequest); + HandleRpcGetCachePayloads(AsyncRequest, RpcRequest); } else { @@ -820,18 +821,18 @@ HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request) } void -HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView BatchRequest) +HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView RpcRequest) { using namespace fmt::literals; - CbPackage BatchResponse; + CbPackage RpcResponse; CacheRecordPolicy Policy; - CbObjectView Params = BatchRequest["Params"sv].AsObjectView(); + CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); std::vector<CacheKey> CacheKeys; std::vector<IoBuffer> CacheValues; std::vector<size_t> UpstreamRequests; - ZEN_ASSERT(BatchRequest["Method"sv].AsString() == "GetCacheRecords"sv); + ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheRecords"sv); CacheRecordPolicy::Load(Params["Policy"sv].AsObjectView(), Policy); @@ -860,10 +861,10 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req if (!SkipAttachments) { - CacheRecord.IterateAttachments([this, &BatchResponse](CbFieldView AttachmentHash) { + CacheRecord.IterateAttachments([this, &RpcResponse](CbFieldView AttachmentHash) { if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) { - BatchResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); + RpcResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); } }); } @@ -893,7 +894,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req if (!UpstreamRequests.empty() && m_UpstreamCache) { const auto OnCacheRecordGetComplete = - [this, &CacheKeys, &CacheValues, &BatchResponse, SkipAttachments](CacheRecordGetCompleteParams&& Params) { + [this, &CacheKeys, &CacheValues, &RpcResponse, SkipAttachments](CacheRecordGetCompleteParams&& Params) { if (Params.Record) { Params.Record.IterateAttachments([&](CbFieldView AttachmentHash) { @@ -905,7 +906,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req if (!SkipAttachments) { - BatchResponse.AddAttachment(CbAttachment(Compressed)); + RpcResponse.AddAttachment(CbAttachment(Compressed)); } } } @@ -917,6 +918,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req NiceBytes(Params.Record.GetView().GetSize()), ToString(HttpContentType::kCbObject)); + ZEN_ASSERT(Params.KeyIndex < CacheValues.size()); CacheValues[Params.KeyIndex] = IoBufferBuilder::MakeCloneFromMemory(Params.Record.GetView()); m_CacheStats.HitCount++; m_CacheStats.UpstreamHitCount++; @@ -948,10 +950,10 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req } ResponseObject.EndArray(); - BatchResponse.SetObject(ResponseObject.Save()); + RpcResponse.SetObject(ResponseObject.Save()); BinaryWriter MemStream; - BatchResponse.Save(MemStream); + RpcResponse.Save(MemStream); Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, @@ -959,16 +961,16 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req } void -HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Request, CbObjectView BatchRequest) +HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Request, CbObjectView RpcRequest) { using namespace fmt::literals; - ZEN_ASSERT(BatchRequest["Method"sv].AsString() == "GetCachePayloads"sv); + ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCachePayloads"sv); std::vector<CacheChunkRequest> ChunkRequests; std::vector<size_t> UpstreamRequests; std::vector<IoBuffer> Chunks; - CbObjectView Params = BatchRequest["Params"sv].AsObjectView(); + CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); for (CbFieldView RequestView : Params["ChunkRequests"sv]) { @@ -1029,7 +1031,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re CurrentKey = ChunkRequest.Key; ZenCacheValue CacheValue; - if (m_CacheStore.Get(ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, CacheValue)) + if (m_CacheStore.Get(CurrentKey.Bucket, CurrentKey.Hash, CacheValue)) { CurrentRecordBuffer = CacheValue.Value; } @@ -1094,6 +1096,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re NiceBytes(Params.Payload.GetSize()), "UPSTREAM"); + ZEN_ASSERT(Params.RequestIndex < Chunks.size()); Chunks[Params.RequestIndex] = std::move(Params.Payload); m_CacheStats.HitCount++; @@ -1109,7 +1112,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re m_UpstreamCache->GetCachePayloads(ChunkRequests, UpstreamRequests, std::move(OnCachePayloadGetComplete)); } - CbPackage BatchResponse; + CbPackage RpcResponse; CbObjectWriter ResponseObject; ResponseObject.BeginArray("Result"sv); @@ -1119,7 +1122,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re if (Chunks[ChunkIndex]) { ResponseObject << ChunkRequests[ChunkIndex].ChunkId; - BatchResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunks[ChunkIndex]))))); + RpcResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunks[ChunkIndex]))))); } else { @@ -1128,10 +1131,10 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re } ResponseObject.EndArray(); - BatchResponse.SetObject(ResponseObject.Save()); + RpcResponse.SetObject(ResponseObject.Save()); BinaryWriter MemStream; - BatchResponse.Save(MemStream); + RpcResponse.Save(MemStream); Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, |