aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache/structuredcache.cpp
diff options
context:
space:
mode:
authorPer Larsson <[email protected]>2022-03-08 15:45:09 +0100
committerPer Larsson <[email protected]>2022-03-08 15:45:09 +0100
commitf3a7c097352d5adf30b0bfc0a03cf88ae4744acb (patch)
tree707161c18534884f5745216b8b1b1855f4dac575 /zenserver/cache/structuredcache.cpp
parentAdded streaming version of GetCacheRecords. (diff)
downloadzen-f3a7c097352d5adf30b0bfc0a03cf88ae4744acb.tar.xz
zen-f3a7c097352d5adf30b0bfc0a03cf88ae4744acb.zip
Added streaming version of GetCacheChunks.
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
-rw-r--r--zenserver/cache/structuredcache.cpp112
1 files changed, 111 insertions, 1 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index bb4b67797..d469e3c68 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -167,6 +167,7 @@ HttpStructuredCacheService::RegisterHandlers(WebSocketServer& Server)
Server.RegisterRequestHandler("GetBinaryCacheValue"sv, *this);
Server.RegisterRequestHandler("GetCacheValues"sv, *this);
Server.RegisterRequestHandler("GetCacheRecords"sv, *this);
+ Server.RegisterRequestHandler("GetCacheChunks"sv, *this);
}
bool
@@ -324,7 +325,6 @@ HttpStructuredCacheService::HandleRequest(const WebSocketMessage& RequestMessage
std::vector<CacheKeyRequest> UpstreamRequests;
CbArrayView RequestsView = Params["Requests"sv].AsArrayView();
- const uint64_t RequestCount = RequestsView.Num();
for (int32_t Idx = 0; CbFieldView RequestField : RequestsView)
{
@@ -526,6 +526,116 @@ HttpStructuredCacheService::HandleRequest(const WebSocketMessage& RequestMessage
return true;
}
+ if (Method == "GetCacheChunks"sv)
+ {
+ ZEN_TRACE_CPU("Z$::WS_GetCacheChunks");
+
+ auto GetCidFromValueId = [](const Oid& ValueId, CbObjectView Record, uint64_t& OutRawSize) -> IoHash {
+ CbArrayView Values = Record["Values"sv].AsArrayView();
+
+ for (CbFieldView Value : Values)
+ {
+ CbObjectView ValueObject = Value.AsObjectView();
+ if (ValueObject["Id"sv].AsObjectId() == ValueId)
+ {
+ OutRawSize = ValueObject["RawSize"sv].AsUInt64();
+ return ValueObject["RawHash"sv].AsHash();
+ }
+ }
+
+ return IoHash::Zero;
+ };
+
+ CacheKey CurrentKey;
+ IoBuffer CurrentRecordValue;
+ CompressedBuffer Compressed;
+
+ CbArrayView RequestsView = Params["Requests"sv].AsArrayView();
+
+ for (int32_t Idx = 0; CbFieldView RequestField : RequestsView)
+ {
+ CbObjectView RequestObject = RequestField.AsObjectView();
+ const int32_t RequestIndex = Idx++;
+
+ CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
+ CacheKey Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash());
+ const IoHash RawHash = RequestObject["ChunkId"sv].AsHash();
+ const Oid ValueId = RequestObject["ValueId"sv].AsObjectId();
+ const uint64_t RequestedRawOffset = RequestObject["RawOffset"sv].AsUInt64();
+ const uint64_t RequestedRawSize = RequestObject["RawSize"sv].AsUInt64(UINT64_MAX);
+
+ IoHash Cid = RawHash;
+ uint64_t RawSize = 0;
+
+ if (RawHash == IoHash::Zero)
+ {
+ if (CurrentKey != Key || CurrentRecordValue.GetSize() == 0)
+ {
+ ZenCacheValue RecordCacheValue;
+ if (m_CacheStore.Get(Key.Bucket, Key.Hash, RecordCacheValue))
+ {
+ CurrentRecordValue = RecordCacheValue.Value;
+ CurrentKey = Key;
+ }
+ }
+
+ if (CurrentRecordValue)
+ {
+ Cid = GetCidFromValueId(ValueId, CbObjectView(CurrentRecordValue.GetData()), RawSize);
+ }
+ }
+
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Cid))
+ {
+ Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk));
+ RawSize = Compressed.GetRawSize();
+ }
+
+ if (Compressed || RawSize > 0)
+ {
+ CbPackage Response;
+ CbObjectWriter ResponseObject;
+
+ ResponseObject.BeginObject("Result"sv);
+ ResponseObject.AddInteger("RequestIndex"sv, RequestIndex);
+ ResponseObject.AddHash("RawHash"sv, Cid);
+ ResponseObject.AddInteger("RawSize"sv, RawSize);
+ ResponseObject.EndObject();
+
+ Response.SetObject(ResponseObject.Save());
+
+ if (Compressed)
+ {
+ Response.AddAttachment(CbAttachment(std::move(Compressed)));
+ }
+
+ SendStreamResponse(RequestMessage.SocketId(), RequestMessage.CorrelationId(), std::move(Response));
+
+ ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL)",
+ Key.Bucket,
+ Key.Hash,
+ NiceBytes(RawSize),
+ ToString(ZenContentType::kCompressedBinary));
+ }
+ else
+ {
+ CbObjectWriter ResponseObject;
+
+ ResponseObject.BeginObject("Error"sv);
+ ResponseObject.AddInteger("RequestIndex"sv, RequestIndex);
+ ResponseObject.EndObject();
+
+ SendStreamResponse(RequestMessage.SocketId(), RequestMessage.CorrelationId(), std::move(ResponseObject.Save()));
+
+ ZEN_DEBUG("MISS - '{}/{}' '{}'", Key.Bucket, Key.Hash, ToString(ZenContentType::kCompressedBinary));
+ }
+ }
+
+ SendStreamCompleteResponse(RequestMessage.SocketId(), RequestMessage.CorrelationId());
+
+ return true;
+ }
+
return false;
}