diff options
| author | Per Larsson <[email protected]> | 2022-03-08 15:45:09 +0100 |
|---|---|---|
| committer | Per Larsson <[email protected]> | 2022-03-08 15:45:09 +0100 |
| commit | f3a7c097352d5adf30b0bfc0a03cf88ae4744acb (patch) | |
| tree | 707161c18534884f5745216b8b1b1855f4dac575 | |
| parent | Added streaming version of GetCacheRecords. (diff) | |
| download | zen-f3a7c097352d5adf30b0bfc0a03cf88ae4744acb.tar.xz zen-f3a7c097352d5adf30b0bfc0a03cf88ae4744acb.zip | |
Added streaming version of GetCacheChunks.
| -rw-r--r-- | zenhttp/include/zenhttp/websocket.h | 1 | ||||
| -rw-r--r-- | zenhttp/websocketasio.cpp | 41 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 112 |
3 files changed, 137 insertions, 17 deletions
diff --git a/zenhttp/include/zenhttp/websocket.h b/zenhttp/include/zenhttp/websocket.h index 7ec0a8555..88b9ad641 100644 --- a/zenhttp/include/zenhttp/websocket.h +++ b/zenhttp/include/zenhttp/websocket.h @@ -154,6 +154,7 @@ protected: virtual void RegisterHandlers(WebSocketServer& Server) = 0; void SendStreamResponse(WebSocketId SocketId, uint32_t CorrelationId, CbPackage&& StreamResponse); + void SendStreamResponse(WebSocketId SocketId, uint32_t CorrelationId, CbObject&& StreamResponse); void SendStreamCompleteResponse(WebSocketId SocketId, uint32_t CorrelationId); WebSocketServer& SocketServer() diff --git a/zenhttp/websocketasio.cpp b/zenhttp/websocketasio.cpp index 5a47d0ea9..9b64cc0a5 100644 --- a/zenhttp/websocketasio.cpp +++ b/zenhttp/websocketasio.cpp @@ -659,22 +659,22 @@ WsServer::Run() m_Acceptor->set_option(asio::socket_base::send_buffer_size(SendBufferSize)); #if ZEN_PLATFORM_WINDOWS - // On Windows, loopback connections can take advantage of a faster code path optionally with this flag. - // This must be used by both the client and server side, and is only effective in the absence of - // Windows Filtering Platform (WFP) callouts which can be installed by security software. - // https://docs.microsoft.com/en-us/windows/win32/winsock/sio-loopback-fast-path - SOCKET NativeSocket = m_Acceptor->native_handle(); - int LoopbackOptionValue = 1; - DWORD OptionNumberOfBytesReturned = 0; - WSAIoctl(NativeSocket, - SIO_LOOPBACK_FAST_PATH, - &LoopbackOptionValue, - sizeof(LoopbackOptionValue), - NULL, - 0, - &OptionNumberOfBytesReturned, - 0, - 0); + // On Windows, loopback connections can take advantage of a faster code path optionally with this flag. + // This must be used by both the client and server side, and is only effective in the absence of + // Windows Filtering Platform (WFP) callouts which can be installed by security software. + // https://docs.microsoft.com/en-us/windows/win32/winsock/sio-loopback-fast-path + SOCKET NativeSocket = m_Acceptor->native_handle(); + int LoopbackOptionValue = 1; + DWORD OptionNumberOfBytesReturned = 0; + WSAIoctl(NativeSocket, + SIO_LOOPBACK_FAST_PATH, + &LoopbackOptionValue, + sizeof(LoopbackOptionValue), + NULL, + 0, + &OptionNumberOfBytesReturned, + 0, + 0); #endif asio::error_code Ec; @@ -1590,6 +1590,15 @@ WebSocketService::SendStreamResponse(WebSocketId SocketId, uint32_t CorrelationI } void +WebSocketService::SendStreamResponse(WebSocketId SocketId, uint32_t CorrelationId, CbObject&& StreamResponse) +{ + CbPackage Response; + Response.SetObject(std::move(StreamResponse)); + + SendStreamResponse(SocketId, CorrelationId, std::move(Response)); +} + +void WebSocketService::SendStreamCompleteResponse(WebSocketId SocketId, uint32_t CorrelationId) { WebSocketMessage Message; diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index bb4b67797..d469e3c68 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -167,6 +167,7 @@ HttpStructuredCacheService::RegisterHandlers(WebSocketServer& Server) Server.RegisterRequestHandler("GetBinaryCacheValue"sv, *this); Server.RegisterRequestHandler("GetCacheValues"sv, *this); Server.RegisterRequestHandler("GetCacheRecords"sv, *this); + Server.RegisterRequestHandler("GetCacheChunks"sv, *this); } bool @@ -324,7 +325,6 @@ HttpStructuredCacheService::HandleRequest(const WebSocketMessage& RequestMessage std::vector<CacheKeyRequest> UpstreamRequests; CbArrayView RequestsView = Params["Requests"sv].AsArrayView(); - const uint64_t RequestCount = RequestsView.Num(); for (int32_t Idx = 0; CbFieldView RequestField : RequestsView) { @@ -526,6 +526,116 @@ HttpStructuredCacheService::HandleRequest(const WebSocketMessage& RequestMessage return true; } + if (Method == "GetCacheChunks"sv) + { + ZEN_TRACE_CPU("Z$::WS_GetCacheChunks"); + + auto GetCidFromValueId = [](const Oid& ValueId, CbObjectView Record, uint64_t& OutRawSize) -> IoHash { + CbArrayView Values = Record["Values"sv].AsArrayView(); + + for (CbFieldView Value : Values) + { + CbObjectView ValueObject = Value.AsObjectView(); + if (ValueObject["Id"sv].AsObjectId() == ValueId) + { + OutRawSize = ValueObject["RawSize"sv].AsUInt64(); + return ValueObject["RawHash"sv].AsHash(); + } + } + + return IoHash::Zero; + }; + + CacheKey CurrentKey; + IoBuffer CurrentRecordValue; + CompressedBuffer Compressed; + + CbArrayView RequestsView = Params["Requests"sv].AsArrayView(); + + for (int32_t Idx = 0; CbFieldView RequestField : RequestsView) + { + CbObjectView RequestObject = RequestField.AsObjectView(); + const int32_t RequestIndex = Idx++; + + CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); + CacheKey Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash()); + const IoHash RawHash = RequestObject["ChunkId"sv].AsHash(); + const Oid ValueId = RequestObject["ValueId"sv].AsObjectId(); + const uint64_t RequestedRawOffset = RequestObject["RawOffset"sv].AsUInt64(); + const uint64_t RequestedRawSize = RequestObject["RawSize"sv].AsUInt64(UINT64_MAX); + + IoHash Cid = RawHash; + uint64_t RawSize = 0; + + if (RawHash == IoHash::Zero) + { + if (CurrentKey != Key || CurrentRecordValue.GetSize() == 0) + { + ZenCacheValue RecordCacheValue; + if (m_CacheStore.Get(Key.Bucket, Key.Hash, RecordCacheValue)) + { + CurrentRecordValue = RecordCacheValue.Value; + CurrentKey = Key; + } + } + + if (CurrentRecordValue) + { + Cid = GetCidFromValueId(ValueId, CbObjectView(CurrentRecordValue.GetData()), RawSize); + } + } + + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Cid)) + { + Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); + RawSize = Compressed.GetRawSize(); + } + + if (Compressed || RawSize > 0) + { + CbPackage Response; + CbObjectWriter ResponseObject; + + ResponseObject.BeginObject("Result"sv); + ResponseObject.AddInteger("RequestIndex"sv, RequestIndex); + ResponseObject.AddHash("RawHash"sv, Cid); + ResponseObject.AddInteger("RawSize"sv, RawSize); + ResponseObject.EndObject(); + + Response.SetObject(ResponseObject.Save()); + + if (Compressed) + { + Response.AddAttachment(CbAttachment(std::move(Compressed))); + } + + SendStreamResponse(RequestMessage.SocketId(), RequestMessage.CorrelationId(), std::move(Response)); + + ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL)", + Key.Bucket, + Key.Hash, + NiceBytes(RawSize), + ToString(ZenContentType::kCompressedBinary)); + } + else + { + CbObjectWriter ResponseObject; + + ResponseObject.BeginObject("Error"sv); + ResponseObject.AddInteger("RequestIndex"sv, RequestIndex); + ResponseObject.EndObject(); + + SendStreamResponse(RequestMessage.SocketId(), RequestMessage.CorrelationId(), std::move(ResponseObject.Save())); + + ZEN_DEBUG("MISS - '{}/{}' '{}'", Key.Bucket, Key.Hash, ToString(ZenContentType::kCompressedBinary)); + } + } + + SendStreamCompleteResponse(RequestMessage.SocketId(), RequestMessage.CorrelationId()); + + return true; + } + return false; } |