aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPer Larsson <[email protected]>2022-03-08 15:45:09 +0100
committerPer Larsson <[email protected]>2022-03-08 15:45:09 +0100
commitf3a7c097352d5adf30b0bfc0a03cf88ae4744acb (patch)
tree707161c18534884f5745216b8b1b1855f4dac575
parentAdded streaming version of GetCacheRecords. (diff)
downloadzen-f3a7c097352d5adf30b0bfc0a03cf88ae4744acb.tar.xz
zen-f3a7c097352d5adf30b0bfc0a03cf88ae4744acb.zip
Added streaming version of GetCacheChunks.
-rw-r--r--zenhttp/include/zenhttp/websocket.h1
-rw-r--r--zenhttp/websocketasio.cpp41
-rw-r--r--zenserver/cache/structuredcache.cpp112
3 files changed, 137 insertions, 17 deletions
diff --git a/zenhttp/include/zenhttp/websocket.h b/zenhttp/include/zenhttp/websocket.h
index 7ec0a8555..88b9ad641 100644
--- a/zenhttp/include/zenhttp/websocket.h
+++ b/zenhttp/include/zenhttp/websocket.h
@@ -154,6 +154,7 @@ protected:
virtual void RegisterHandlers(WebSocketServer& Server) = 0;
void SendStreamResponse(WebSocketId SocketId, uint32_t CorrelationId, CbPackage&& StreamResponse);
+ void SendStreamResponse(WebSocketId SocketId, uint32_t CorrelationId, CbObject&& StreamResponse);
void SendStreamCompleteResponse(WebSocketId SocketId, uint32_t CorrelationId);
WebSocketServer& SocketServer()
diff --git a/zenhttp/websocketasio.cpp b/zenhttp/websocketasio.cpp
index 5a47d0ea9..9b64cc0a5 100644
--- a/zenhttp/websocketasio.cpp
+++ b/zenhttp/websocketasio.cpp
@@ -659,22 +659,22 @@ WsServer::Run()
m_Acceptor->set_option(asio::socket_base::send_buffer_size(SendBufferSize));
#if ZEN_PLATFORM_WINDOWS
- // On Windows, loopback connections can take advantage of a faster code path optionally with this flag.
- // This must be used by both the client and server side, and is only effective in the absence of
- // Windows Filtering Platform (WFP) callouts which can be installed by security software.
- // https://docs.microsoft.com/en-us/windows/win32/winsock/sio-loopback-fast-path
- SOCKET NativeSocket = m_Acceptor->native_handle();
- int LoopbackOptionValue = 1;
- DWORD OptionNumberOfBytesReturned = 0;
- WSAIoctl(NativeSocket,
- SIO_LOOPBACK_FAST_PATH,
- &LoopbackOptionValue,
- sizeof(LoopbackOptionValue),
- NULL,
- 0,
- &OptionNumberOfBytesReturned,
- 0,
- 0);
+ // On Windows, loopback connections can take advantage of a faster code path optionally with this flag.
+ // This must be used by both the client and server side, and is only effective in the absence of
+ // Windows Filtering Platform (WFP) callouts which can be installed by security software.
+ // https://docs.microsoft.com/en-us/windows/win32/winsock/sio-loopback-fast-path
+ SOCKET NativeSocket = m_Acceptor->native_handle();
+ int LoopbackOptionValue = 1;
+ DWORD OptionNumberOfBytesReturned = 0;
+ WSAIoctl(NativeSocket,
+ SIO_LOOPBACK_FAST_PATH,
+ &LoopbackOptionValue,
+ sizeof(LoopbackOptionValue),
+ NULL,
+ 0,
+ &OptionNumberOfBytesReturned,
+ 0,
+ 0);
#endif
asio::error_code Ec;
@@ -1590,6 +1590,15 @@ WebSocketService::SendStreamResponse(WebSocketId SocketId, uint32_t CorrelationI
}
void
+WebSocketService::SendStreamResponse(WebSocketId SocketId, uint32_t CorrelationId, CbObject&& StreamResponse)
+{
+ CbPackage Response;
+ Response.SetObject(std::move(StreamResponse));
+
+ SendStreamResponse(SocketId, CorrelationId, std::move(Response));
+}
+
+void
WebSocketService::SendStreamCompleteResponse(WebSocketId SocketId, uint32_t CorrelationId)
{
WebSocketMessage Message;
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index bb4b67797..d469e3c68 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -167,6 +167,7 @@ HttpStructuredCacheService::RegisterHandlers(WebSocketServer& Server)
Server.RegisterRequestHandler("GetBinaryCacheValue"sv, *this);
Server.RegisterRequestHandler("GetCacheValues"sv, *this);
Server.RegisterRequestHandler("GetCacheRecords"sv, *this);
+ Server.RegisterRequestHandler("GetCacheChunks"sv, *this);
}
bool
@@ -324,7 +325,6 @@ HttpStructuredCacheService::HandleRequest(const WebSocketMessage& RequestMessage
std::vector<CacheKeyRequest> UpstreamRequests;
CbArrayView RequestsView = Params["Requests"sv].AsArrayView();
- const uint64_t RequestCount = RequestsView.Num();
for (int32_t Idx = 0; CbFieldView RequestField : RequestsView)
{
@@ -526,6 +526,116 @@ HttpStructuredCacheService::HandleRequest(const WebSocketMessage& RequestMessage
return true;
}
+ if (Method == "GetCacheChunks"sv)
+ {
+ ZEN_TRACE_CPU("Z$::WS_GetCacheChunks");
+
+ auto GetCidFromValueId = [](const Oid& ValueId, CbObjectView Record, uint64_t& OutRawSize) -> IoHash {
+ CbArrayView Values = Record["Values"sv].AsArrayView();
+
+ for (CbFieldView Value : Values)
+ {
+ CbObjectView ValueObject = Value.AsObjectView();
+ if (ValueObject["Id"sv].AsObjectId() == ValueId)
+ {
+ OutRawSize = ValueObject["RawSize"sv].AsUInt64();
+ return ValueObject["RawHash"sv].AsHash();
+ }
+ }
+
+ return IoHash::Zero;
+ };
+
+ CacheKey CurrentKey;
+ IoBuffer CurrentRecordValue;
+ CompressedBuffer Compressed;
+
+ CbArrayView RequestsView = Params["Requests"sv].AsArrayView();
+
+ for (int32_t Idx = 0; CbFieldView RequestField : RequestsView)
+ {
+ CbObjectView RequestObject = RequestField.AsObjectView();
+ const int32_t RequestIndex = Idx++;
+
+ CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
+ CacheKey Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash());
+ const IoHash RawHash = RequestObject["ChunkId"sv].AsHash();
+ const Oid ValueId = RequestObject["ValueId"sv].AsObjectId();
+ const uint64_t RequestedRawOffset = RequestObject["RawOffset"sv].AsUInt64();
+ const uint64_t RequestedRawSize = RequestObject["RawSize"sv].AsUInt64(UINT64_MAX);
+
+ IoHash Cid = RawHash;
+ uint64_t RawSize = 0;
+
+ if (RawHash == IoHash::Zero)
+ {
+ if (CurrentKey != Key || CurrentRecordValue.GetSize() == 0)
+ {
+ ZenCacheValue RecordCacheValue;
+ if (m_CacheStore.Get(Key.Bucket, Key.Hash, RecordCacheValue))
+ {
+ CurrentRecordValue = RecordCacheValue.Value;
+ CurrentKey = Key;
+ }
+ }
+
+ if (CurrentRecordValue)
+ {
+ Cid = GetCidFromValueId(ValueId, CbObjectView(CurrentRecordValue.GetData()), RawSize);
+ }
+ }
+
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Cid))
+ {
+ Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk));
+ RawSize = Compressed.GetRawSize();
+ }
+
+ if (Compressed || RawSize > 0)
+ {
+ CbPackage Response;
+ CbObjectWriter ResponseObject;
+
+ ResponseObject.BeginObject("Result"sv);
+ ResponseObject.AddInteger("RequestIndex"sv, RequestIndex);
+ ResponseObject.AddHash("RawHash"sv, Cid);
+ ResponseObject.AddInteger("RawSize"sv, RawSize);
+ ResponseObject.EndObject();
+
+ Response.SetObject(ResponseObject.Save());
+
+ if (Compressed)
+ {
+ Response.AddAttachment(CbAttachment(std::move(Compressed)));
+ }
+
+ SendStreamResponse(RequestMessage.SocketId(), RequestMessage.CorrelationId(), std::move(Response));
+
+ ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL)",
+ Key.Bucket,
+ Key.Hash,
+ NiceBytes(RawSize),
+ ToString(ZenContentType::kCompressedBinary));
+ }
+ else
+ {
+ CbObjectWriter ResponseObject;
+
+ ResponseObject.BeginObject("Error"sv);
+ ResponseObject.AddInteger("RequestIndex"sv, RequestIndex);
+ ResponseObject.EndObject();
+
+ SendStreamResponse(RequestMessage.SocketId(), RequestMessage.CorrelationId(), std::move(ResponseObject.Save()));
+
+ ZEN_DEBUG("MISS - '{}/{}' '{}'", Key.Bucket, Key.Hash, ToString(ZenContentType::kCompressedBinary));
+ }
+ }
+
+ SendStreamCompleteResponse(RequestMessage.SocketId(), RequestMessage.CorrelationId());
+
+ return true;
+ }
+
return false;
}