diff options
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 112 |
1 files changed, 111 insertions, 1 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index bb4b67797..d469e3c68 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -167,6 +167,7 @@ HttpStructuredCacheService::RegisterHandlers(WebSocketServer& Server) Server.RegisterRequestHandler("GetBinaryCacheValue"sv, *this); Server.RegisterRequestHandler("GetCacheValues"sv, *this); Server.RegisterRequestHandler("GetCacheRecords"sv, *this); + Server.RegisterRequestHandler("GetCacheChunks"sv, *this); } bool @@ -324,7 +325,6 @@ HttpStructuredCacheService::HandleRequest(const WebSocketMessage& RequestMessage std::vector<CacheKeyRequest> UpstreamRequests; CbArrayView RequestsView = Params["Requests"sv].AsArrayView(); - const uint64_t RequestCount = RequestsView.Num(); for (int32_t Idx = 0; CbFieldView RequestField : RequestsView) { @@ -526,6 +526,116 @@ HttpStructuredCacheService::HandleRequest(const WebSocketMessage& RequestMessage return true; } + if (Method == "GetCacheChunks"sv) + { + ZEN_TRACE_CPU("Z$::WS_GetCacheChunks"); + + auto GetCidFromValueId = [](const Oid& ValueId, CbObjectView Record, uint64_t& OutRawSize) -> IoHash { + CbArrayView Values = Record["Values"sv].AsArrayView(); + + for (CbFieldView Value : Values) + { + CbObjectView ValueObject = Value.AsObjectView(); + if (ValueObject["Id"sv].AsObjectId() == ValueId) + { + OutRawSize = ValueObject["RawSize"sv].AsUInt64(); + return ValueObject["RawHash"sv].AsHash(); + } + } + + return IoHash::Zero; + }; + + CacheKey CurrentKey; + IoBuffer CurrentRecordValue; + CompressedBuffer Compressed; + + CbArrayView RequestsView = Params["Requests"sv].AsArrayView(); + + for (int32_t Idx = 0; CbFieldView RequestField : RequestsView) + { + CbObjectView RequestObject = RequestField.AsObjectView(); + const int32_t RequestIndex = Idx++; + + CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); + CacheKey Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash()); + const IoHash RawHash = RequestObject["ChunkId"sv].AsHash(); + const Oid ValueId = RequestObject["ValueId"sv].AsObjectId(); + const uint64_t RequestedRawOffset = RequestObject["RawOffset"sv].AsUInt64(); + const uint64_t RequestedRawSize = RequestObject["RawSize"sv].AsUInt64(UINT64_MAX); + + IoHash Cid = RawHash; + uint64_t RawSize = 0; + + if (RawHash == IoHash::Zero) + { + if (CurrentKey != Key || CurrentRecordValue.GetSize() == 0) + { + ZenCacheValue RecordCacheValue; + if (m_CacheStore.Get(Key.Bucket, Key.Hash, RecordCacheValue)) + { + CurrentRecordValue = RecordCacheValue.Value; + CurrentKey = Key; + } + } + + if (CurrentRecordValue) + { + Cid = GetCidFromValueId(ValueId, CbObjectView(CurrentRecordValue.GetData()), RawSize); + } + } + + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Cid)) + { + Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); + RawSize = Compressed.GetRawSize(); + } + + if (Compressed || RawSize > 0) + { + CbPackage Response; + CbObjectWriter ResponseObject; + + ResponseObject.BeginObject("Result"sv); + ResponseObject.AddInteger("RequestIndex"sv, RequestIndex); + ResponseObject.AddHash("RawHash"sv, Cid); + ResponseObject.AddInteger("RawSize"sv, RawSize); + ResponseObject.EndObject(); + + Response.SetObject(ResponseObject.Save()); + + if (Compressed) + { + Response.AddAttachment(CbAttachment(std::move(Compressed))); + } + + SendStreamResponse(RequestMessage.SocketId(), RequestMessage.CorrelationId(), std::move(Response)); + + ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL)", + Key.Bucket, + Key.Hash, + NiceBytes(RawSize), + ToString(ZenContentType::kCompressedBinary)); + } + else + { + CbObjectWriter ResponseObject; + + ResponseObject.BeginObject("Error"sv); + ResponseObject.AddInteger("RequestIndex"sv, RequestIndex); + ResponseObject.EndObject(); + + SendStreamResponse(RequestMessage.SocketId(), RequestMessage.CorrelationId(), std::move(ResponseObject.Save())); + + ZEN_DEBUG("MISS - '{}/{}' '{}'", Key.Bucket, Key.Hash, ToString(ZenContentType::kCompressedBinary)); + } + } + + SendStreamCompleteResponse(RequestMessage.SocketId(), RequestMessage.CorrelationId()); + + return true; + } + return false; } |