aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache/structuredcache.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
-rw-r--r--zenserver/cache/structuredcache.cpp552
1 files changed, 292 insertions, 260 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index e23030e24..3d5359188 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -1537,433 +1537,465 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http
IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
}
-namespace GetCacheChunks::detail {
+namespace cache::detail {
- struct ValueData
+ struct RecordValue
{
Oid ValueId;
IoHash ContentId;
uint64_t RawSize;
};
- struct KeyRequestData
- {
- CacheKeyRequest Upstream;
- IoBuffer CacheValue;
- std::vector<ValueData> Values;
- CachePolicy DownstreamRecordPolicy;
- CachePolicy DownstreamPolicy;
- std::string_view Source;
- bool Exists = false;
- bool HasRequest = false;
- bool HasRecordRequest = false;
- bool HasValueRequest = false;
- bool ValuesRead = false;
+ struct Record
+ {
+ IoBuffer CacheValue;
+ std::vector<RecordValue> Values;
+ std::string_view Source;
+ CachePolicy DownstreamPolicy;
+ bool Exists = false;
+ bool HasRequest = false;
+ bool ValuesRead = false;
};
- struct ChunkRequestData
- {
- CacheChunkRequest Upstream;
- KeyRequestData* KeyRequest;
- size_t KeyRequestIndex;
- CachePolicy DownstreamPolicy;
- CompressedBuffer Value;
- std::string_view Source;
- uint64_t TotalSize = 0;
- bool Exists = false;
- bool IsRecordRequest = false;
- bool TotalSizeKnown = false;
+ struct ChunkRequest
+ {
+ CacheChunkRequest* Key = nullptr;
+ Record* Record = nullptr;
+ CompressedBuffer Value;
+ std::string_view Source;
+ uint64_t TotalSize = 0;
+ uint64_t RequestedSize = 0;
+ uint64_t RequestedOffset = 0;
+ CachePolicy DownstreamPolicy;
+ bool Exists = false;
+ bool TotalSizeKnown = false;
+ bool IsRecordRequest = false;
};
-} // namespace GetCacheChunks::detail
+} // namespace cache::detail
void
HttpStructuredCacheService::HandleRpcGetCacheChunks(zen::HttpServerRequest& HttpRequest, CbObjectView RpcRequest)
{
- using namespace GetCacheChunks::detail;
+ using namespace cache::detail;
ZEN_TRACE_CPU("Z$::RpcGetCacheChunks");
- std::vector<KeyRequestData> KeyRequests;
- std::vector<ChunkRequestData> Chunks;
- if (!TryGetCacheChunks_Parse(KeyRequests, Chunks, RpcRequest))
+ std::vector<CacheKeyRequest> RecordKeys; // Data about a Record necessary to identify it to the upstream
+ std::vector<Record> Records; // Scratch-space data about a Record when fulfilling RecordRequests
+ std::vector<CacheChunkRequest> RequestKeys; // Data about a ChunkRequest necessary to identify it to the upstream
+ std::vector<ChunkRequest> Requests; // Intermediate and result data about a ChunkRequest
+ std::vector<ChunkRequest*> RecordRequests; // The ChunkRequests that are requesting a subvalue from a Record Key
+ std::vector<ChunkRequest*> ValueRequests; // The ChunkRequests that are requesting a Value Key
+ std::vector<CacheChunkRequest*> UpstreamChunks; // ChunkRequests that we need to send to the upstream
+
+ // Parse requests from the CompactBinary body of the RpcRequest and divide it into RecordRequests and ValueRequests
+ if (!ParseGetCacheChunksRequest(RecordKeys, Records, RequestKeys, Requests, RecordRequests, ValueRequests, RpcRequest))
{
return HttpRequest.WriteResponse(HttpResponseCode::BadRequest);
}
- GetCacheChunks_LoadKeys(KeyRequests);
- GetCacheChunks_LoadChunks(Chunks);
- GetCacheChunks_SendResults(Chunks, HttpRequest);
+
+ // For each Record request, load the Record if necessary to find the Chunk's ContentId, load its Payloads if we
+ // have it locally, and otherwise append a request for the payload to UpstreamChunks
+ GetLocalCacheRecords(RecordKeys, Records, RecordRequests, UpstreamChunks);
+
+ // For each Value request, load the Value if we have it locally and otherwise append a request for the payload to UpstreamChunks
+ GetLocalCacheValues(ValueRequests, UpstreamChunks);
+
+ // Call GetCacheChunks on the upstream for any payloads we do not have locally
+ GetUpstreamCacheChunks(UpstreamChunks, RequestKeys, Requests);
+
+ // Send the payload and descriptive data about each chunk to the client
+ WriteGetCacheChunksResponse(Requests, HttpRequest);
}
bool
-HttpStructuredCacheService::TryGetCacheChunks_Parse(std::vector<GetCacheChunks::detail::KeyRequestData>& KeyRequests,
- std::vector<GetCacheChunks::detail::ChunkRequestData>& Chunks,
- CbObjectView RpcRequest)
+HttpStructuredCacheService::ParseGetCacheChunksRequest(std::vector<CacheKeyRequest>& RecordKeys,
+ std::vector<cache::detail::Record>& Records,
+ std::vector<CacheChunkRequest>& RequestKeys,
+ std::vector<cache::detail::ChunkRequest>& Requests,
+ std::vector<cache::detail::ChunkRequest*>& RecordRequests,
+ std::vector<cache::detail::ChunkRequest*>& ValueRequests,
+ CbObjectView RpcRequest)
{
- using namespace GetCacheChunks::detail;
+ using namespace cache::detail;
ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheChunks"sv);
- CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
- std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString();
- CachePolicy DefaultPolicy = !DefaultPolicyText.empty() ? ParseCachePolicy(DefaultPolicyText) : CachePolicy::Default;
+ CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
+ std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString();
+ CachePolicy DefaultPolicy = !DefaultPolicyText.empty() ? ParseCachePolicy(DefaultPolicyText) : CachePolicy::Default;
+ CbArrayView ChunkRequestsArray = Params["ChunkRequests"sv].AsArrayView();
+ size_t NumRequests = static_cast<size_t>(ChunkRequestsArray.Num());
+
+ // Note that these reservations allow us to take pointers to the elements while populating them. If the reservation is removed,
+ // we will need to change the pointers to indexes to handle reallocations.
+ RecordKeys.reserve(NumRequests);
+ Records.reserve(NumRequests);
+ RequestKeys.reserve(NumRequests);
+ Requests.reserve(NumRequests);
+ RecordRequests.reserve(NumRequests);
+ ValueRequests.reserve(NumRequests);
+
+ CacheKeyRequest* PreviousRecordKey = nullptr;
+ Record* PreviousRecord = nullptr;
- KeyRequestData* PreviousKeyRequest = nullptr;
- CbArrayView ChunkRequestsArray = Params["ChunkRequests"sv].AsArrayView();
- Chunks.reserve(ChunkRequestsArray.Num());
for (CbFieldView RequestView : ChunkRequestsArray)
{
- ChunkRequestData& Chunk = Chunks.emplace_back();
- CbObjectView RequestObject = RequestView.AsObjectView();
+ CbObjectView RequestObject = RequestView.AsObjectView();
+ CacheChunkRequest& RequestKey = RequestKeys.emplace_back();
+ ChunkRequest& Request = Requests.emplace_back();
+ Request.Key = &RequestKey;
CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
CbFieldView HashField = KeyObject["Hash"sv];
- Chunk.Upstream.Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), HashField.AsHash());
- if (Chunk.Upstream.Key.Bucket.empty() || HashField.HasError())
+ RequestKey.Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), HashField.AsHash());
+ if (RequestKey.Key.Bucket.empty() || HashField.HasError())
{
ZEN_WARN("GetCacheChunks: Invalid key in ChunkRequest.");
return false;
}
- KeyRequestData* KeyRequest = nullptr;
- if (!PreviousKeyRequest || PreviousKeyRequest->Upstream.Key < Chunk.Upstream.Key)
- {
- KeyRequest = &KeyRequests.emplace_back();
- KeyRequest->Upstream.Key = Chunk.Upstream.Key;
- PreviousKeyRequest = KeyRequest;
- }
- else if (!(Chunk.Upstream.Key < PreviousKeyRequest->Upstream.Key))
+ RequestKey.ChunkId = RequestObject["ChunkId"sv].AsHash();
+ RequestKey.ValueId = RequestObject["ValueId"sv].AsObjectId();
+ RequestKey.RawOffset = RequestObject["RawOffset"sv].AsUInt64();
+ RequestKey.RawSize = RequestObject["RawSize"sv].AsUInt64(UINT64_MAX);
+ Request.RequestedSize = RequestKey.RawSize;
+ Request.RequestedOffset = RequestKey.RawOffset;
+ std::string_view PolicyText = RequestObject["Policy"sv].AsString();
+ Request.DownstreamPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
+ Request.IsRecordRequest = (bool)RequestKey.ValueId;
+
+ if (!Request.IsRecordRequest)
{
- KeyRequest = PreviousKeyRequest;
+ ValueRequests.push_back(&Request);
}
else
{
- ZEN_WARN("GetCacheChunks: Keys in ChunkRequest are not sorted: {}/{} came after {}/{}.",
- Chunk.Upstream.Key.Bucket,
- Chunk.Upstream.Key.Hash,
- PreviousKeyRequest->Upstream.Key.Bucket,
- PreviousKeyRequest->Upstream.Key.Hash);
- return false;
- }
- Chunk.KeyRequestIndex = std::distance(KeyRequests.data(), KeyRequest);
-
- Chunk.Upstream.ChunkId = RequestObject["ChunkId"sv].AsHash();
- Chunk.Upstream.ValueId = RequestObject["ValueId"sv].AsObjectId();
- Chunk.Upstream.RawOffset = RequestObject["RawOffset"sv].AsUInt64();
- Chunk.Upstream.RawSize = RequestObject["RawSize"sv].AsUInt64(UINT64_MAX);
- std::string_view PolicyText = RequestObject["Policy"sv].AsString();
- Chunk.DownstreamPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
- Chunk.IsRecordRequest = (bool)Chunk.Upstream.ValueId;
+ RecordRequests.push_back(&Request);
+ CacheKeyRequest* RecordKey = nullptr;
+ Record* Record = nullptr;
- if (!Chunk.IsRecordRequest || Chunk.Upstream.ChunkId == IoHash::Zero)
- {
- KeyRequest->DownstreamPolicy =
- KeyRequest->HasRequest ? Union(KeyRequest->DownstreamPolicy, Chunk.DownstreamPolicy) : Chunk.DownstreamPolicy;
- KeyRequest->HasRequest = true;
- (Chunk.IsRecordRequest ? KeyRequest->HasRecordRequest : KeyRequest->HasValueRequest) = true;
+ if (!PreviousRecordKey || PreviousRecordKey->Key < RequestKey.Key)
+ {
+ RecordKey = &RecordKeys.emplace_back();
+ PreviousRecordKey = RecordKey;
+ Record = &Records.emplace_back();
+ PreviousRecord = Record;
+ RecordKey->Key = RequestKey.Key;
+ }
+ else if (RequestKey.Key == PreviousRecordKey->Key)
+ {
+ RecordKey = PreviousRecordKey;
+ Record = PreviousRecord;
+ }
+ else
+ {
+ ZEN_WARN("GetCacheChunks: Keys in ChunkRequest are not sorted: {}/{} came after {}/{}.",
+ RequestKey.Key.Bucket,
+ RequestKey.Key.Hash,
+ PreviousRecordKey->Key.Bucket,
+ PreviousRecordKey->Key.Hash);
+ return false;
+ }
+ Request.Record = Record;
+ if (RequestKey.ChunkId == RequestKey.ChunkId.Zero)
+ {
+ Record->DownstreamPolicy =
+ Record->HasRequest ? Union(Record->DownstreamPolicy, Request.DownstreamPolicy) : Request.DownstreamPolicy;
+ Record->HasRequest = true;
+ }
}
}
- if (Chunks.empty())
+ if (Requests.empty())
{
return false;
}
- for (ChunkRequestData& Chunk : Chunks)
- {
- Chunk.KeyRequest = &KeyRequests[Chunk.KeyRequestIndex];
- }
return true;
}
void
-HttpStructuredCacheService::GetCacheChunks_LoadKeys(std::vector<GetCacheChunks::detail::KeyRequestData>& KeyRequests)
+HttpStructuredCacheService::GetLocalCacheRecords(std::vector<CacheKeyRequest>& RecordKeys,
+ std::vector<cache::detail::Record>& Records,
+ std::vector<cache::detail::ChunkRequest*>& RecordRequests,
+ std::vector<CacheChunkRequest*>& OutUpstreamChunks)
{
- using namespace GetCacheChunks::detail;
+ using namespace cache::detail;
std::vector<CacheKeyRequest*> UpstreamRecordRequests;
- std::vector<KeyRequestData*> UpstreamValueRequests;
- for (KeyRequestData& KeyRequest : KeyRequests)
+ for (size_t RecordIndex = 0; RecordIndex < Records.size(); ++RecordIndex)
{
- if (KeyRequest.HasRequest)
+ CacheKeyRequest& RecordKey = RecordKeys[RecordIndex];
+ Record& Record = Records[RecordIndex];
+ if (Record.HasRequest)
{
- if (KeyRequest.HasRecordRequest)
- {
- KeyRequest.DownstreamRecordPolicy = KeyRequest.DownstreamPolicy | CachePolicy::SkipData | CachePolicy::SkipMeta;
- }
+ Record.DownstreamPolicy |= CachePolicy::SkipData | CachePolicy::SkipMeta;
- if (!KeyRequest.Exists && EnumHasAllFlags(KeyRequest.DownstreamPolicy, CachePolicy::QueryLocal))
+ if (!Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryLocal))
{
- // There's currently no interface for checking only whether a CacheValue exists without loading it,
- // so we load it here even if SkipData is true and its a CacheValue request.
ZenCacheValue CacheValue;
- if (m_CacheStore.Get(KeyRequest.Upstream.Key.Bucket, KeyRequest.Upstream.Key.Hash, CacheValue))
+ if (m_CacheStore.Get(RecordKey.Key.Bucket, RecordKey.Key.Hash, CacheValue))
{
- KeyRequest.Exists = true;
- KeyRequest.CacheValue = std::move(CacheValue.Value);
- KeyRequest.Source = "LOCAL"sv;
+ Record.Exists = true;
+ Record.CacheValue = std::move(CacheValue.Value);
+ Record.Source = "LOCAL"sv;
}
}
- if (!KeyRequest.Exists)
+ if (!Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryRemote))
{
- // At most one of RecordRequest or ValueRequest will succeed for the upstream request of the key a given key, but we don't
- // know which,
- // and if the requests (from arbitrary Unreal Class code) includes both types of request for a key, we want to ask for both
- // kinds and pass the request that uses the one that succeeds.
- if (KeyRequest.HasRecordRequest && EnumHasAllFlags(KeyRequest.DownstreamRecordPolicy, CachePolicy::QueryRemote))
- {
- KeyRequest.Upstream.Policy = CacheRecordPolicy(ConvertToUpstream(KeyRequest.DownstreamRecordPolicy));
- UpstreamRecordRequests.push_back(&KeyRequest.Upstream);
- }
- if (KeyRequest.HasValueRequest && EnumHasAllFlags(KeyRequest.DownstreamPolicy, CachePolicy::QueryRemote))
- {
- UpstreamValueRequests.push_back(&KeyRequest);
- }
+ RecordKey.Policy = CacheRecordPolicy(ConvertToUpstream(Record.DownstreamPolicy));
+ UpstreamRecordRequests.push_back(&RecordKey);
}
}
}
if (!UpstreamRecordRequests.empty())
{
- const auto OnCacheRecordGetComplete = [this](CacheRecordGetCompleteParams&& Params) {
+ const auto OnCacheRecordGetComplete = [this, &RecordKeys, &Records](CacheRecordGetCompleteParams&& Params) {
if (!Params.Record)
{
return;
}
+ CacheKeyRequest& RecordKey = Params.Request;
+ size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey);
+ Record& Record = Records[RecordIndex];
- KeyRequestData& KeyRequest =
- *reinterpret_cast<KeyRequestData*>(reinterpret_cast<char*>(&Params.Request) - offsetof(KeyRequestData, Upstream));
- const CacheKey& Key = KeyRequest.Upstream.Key;
- KeyRequest.Exists = true;
+ const CacheKey& Key = RecordKey.Key;
+ Record.Exists = true;
CbObject ObjectBuffer = CbObject::Clone(Params.Record);
- KeyRequest.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer();
- KeyRequest.CacheValue.SetContentType(ZenContentType::kCbObject);
- KeyRequest.Source = "UPSTREAM"sv;
+ Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer();
+ Record.CacheValue.SetContentType(ZenContentType::kCbObject);
+ Record.Source = "UPSTREAM"sv;
- if (EnumHasAllFlags(KeyRequest.DownstreamPolicy, CachePolicy::StoreLocal))
+ if (EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal))
{
- m_CacheStore.Put(Key.Bucket, Key.Hash, {.Value = KeyRequest.CacheValue});
+ m_CacheStore.Put(Key.Bucket, Key.Hash, {.Value = Record.CacheValue});
}
};
m_UpstreamCache.GetCacheRecords(UpstreamRecordRequests, std::move(OnCacheRecordGetComplete));
}
- if (!UpstreamValueRequests.empty())
- {
- for (KeyRequestData* KeyRequestPtr : UpstreamValueRequests)
- {
- KeyRequestData& KeyRequest = *KeyRequestPtr;
- CacheKey& Key = KeyRequest.Upstream.Key;
- GetUpstreamCacheResult UpstreamResult = m_UpstreamCache.GetCacheRecord({Key.Bucket, Key.Hash}, ZenContentType::kBinary);
- if (UpstreamResult.Success && IsCompressedBinary(UpstreamResult.Value.GetContentType()))
- {
- CompressedBuffer Result = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value));
- if (Result)
- {
- KeyRequest.CacheValue = std::move(UpstreamResult.Value);
- KeyRequest.CacheValue.SetContentType(ZenContentType::kCompressedBinary);
- KeyRequest.Exists = true;
- KeyRequest.Source = "UPSTREAM"sv;
- // TODO: Respect the StoreLocal flag once we have upstream existence-only checks. For now the requirement
- // that we copy data from upstream even when SkipData and !StoreLocal are true means that it is too expensive
- // for us to keep the data only on the upstream server.
- // if (EnumHasAllFlags(KeyRequest->DownstreamValuePolicy, CachePolicy::StoreLocal))
- {
- m_CacheStore.Put(Key.Bucket, Key.Hash, {.Value = KeyRequest.CacheValue});
- }
- }
- }
- }
- }
-}
-
-void
-HttpStructuredCacheService::GetCacheChunks_LoadChunks(std::vector<GetCacheChunks::detail::ChunkRequestData>& Chunks)
-{
- using namespace GetCacheChunks::detail;
-
std::vector<CacheChunkRequest*> UpstreamPayloadRequests;
- for (ChunkRequestData& Chunk : Chunks)
+ for (ChunkRequest* Request : RecordRequests)
{
- if (Chunk.IsRecordRequest)
+ if (Request->Key->ChunkId == IoHash::Zero)
{
- if (Chunk.Upstream.ChunkId == IoHash::Zero)
+ // Unreal uses a 12 byte ID to address cache record values. When the uncompressed hash (ChunkId)
+ // is missing, parse the cache record and try to find the raw hash from the ValueId.
+ Record& Record = *Request->Record;
+ if (!Record.ValuesRead)
{
- // Unreal uses a 12 byte ID to address cache record values. When the uncompressed hash (ChunkId)
- // is missing, parse the cache record and try to find the raw hash from the ValueId.
- KeyRequestData& KeyRequest = *Chunk.KeyRequest;
- if (!KeyRequest.ValuesRead)
+ Record.ValuesRead = true;
+ if (Record.CacheValue && Record.CacheValue.GetContentType() == ZenContentType::kCbObject)
{
- KeyRequest.ValuesRead = true;
- if (KeyRequest.CacheValue && KeyRequest.CacheValue.GetContentType() == ZenContentType::kCbObject)
+ CbObjectView RecordObject = CbObjectView(Record.CacheValue.GetData());
+ CbArrayView ValuesArray = RecordObject["Values"sv].AsArrayView();
+ Record.Values.reserve(ValuesArray.Num());
+ for (CbFieldView ValueField : ValuesArray)
{
- CbObjectView RecordObject = CbObjectView(KeyRequest.CacheValue.GetData());
- CbArrayView ValuesArray = RecordObject["Values"sv].AsArrayView();
- KeyRequest.Values.reserve(ValuesArray.Num());
- for (CbFieldView ValueField : ValuesArray)
+ CbObjectView ValueObject = ValueField.AsObjectView();
+ Oid ValueId = ValueObject["Id"sv].AsObjectId();
+ CbFieldView RawHashField = ValueObject["RawHash"sv];
+ IoHash RawHash = RawHashField.AsBinaryAttachment();
+ if (ValueId && !RawHashField.HasError())
{
- CbObjectView ValueObject = ValueField.AsObjectView();
- Oid ValueId = ValueObject["Id"sv].AsObjectId();
- CbFieldView RawHashField = ValueObject["RawHash"sv];
- IoHash RawHash = RawHashField.AsBinaryAttachment();
- if (ValueId && !RawHashField.HasError())
- {
- KeyRequest.Values.push_back({ValueId, RawHash, ValueObject["RawSize"sv].AsUInt64()});
- }
+ Record.Values.push_back({ValueId, RawHash, ValueObject["RawSize"sv].AsUInt64()});
}
}
}
+ }
- for (const ValueData& Value : KeyRequest.Values)
+ for (const RecordValue& Value : Record.Values)
+ {
+ if (Value.ValueId == Request->Key->ValueId)
{
- if (Value.ValueId == Chunk.Upstream.ValueId)
- {
- Chunk.Upstream.ChunkId = Value.ContentId;
- Chunk.TotalSize = Value.RawSize;
- Chunk.TotalSizeKnown = true;
- break;
- }
+ Request->Key->ChunkId = Value.ContentId;
+ Request->TotalSize = Value.RawSize;
+ Request->TotalSizeKnown = true;
+ break;
}
}
+ }
- // Now load the ContentId from the local ContentIdStore or from the upstream
- if (Chunk.Upstream.ChunkId != IoHash::Zero)
+ // Now load the ContentId from the local ContentIdStore or from the upstream
+ if (Request->Key->ChunkId != IoHash::Zero)
+ {
+ if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal))
{
- if (EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::QueryLocal))
+ if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->TotalSizeKnown)
{
- if (EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData) && Chunk.TotalSizeKnown)
+ if (m_CidStore.ContainsChunk(Request->Key->ChunkId))
{
- if (m_CidStore.ContainsChunk(Chunk.Upstream.ChunkId))
- {
- Chunk.Exists = true;
- Chunk.Source = "LOCAL"sv;
- }
+ Request->Exists = true;
+ Request->Source = "LOCAL"sv;
}
- else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Chunk.Upstream.ChunkId))
+ }
+ else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Request->Key->ChunkId))
+ {
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload));
+ if (Compressed)
{
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload));
- if (Compressed)
+ if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData))
{
- if (!EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData))
- {
- Chunk.Value = Compressed;
- }
- Chunk.Exists = true;
- Chunk.TotalSize = Compressed.GetRawSize();
- Chunk.TotalSizeKnown = true;
- Chunk.Source = "LOCAL"sv;
+ Request->Value = Compressed;
}
+ Request->Exists = true;
+ Request->TotalSize = Compressed.GetRawSize();
+ Request->TotalSizeKnown = true;
+ Request->Source = "LOCAL"sv;
}
}
- if (!Chunk.Exists && EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::QueryRemote))
- {
- Chunk.Upstream.Policy = ConvertToUpstream(Chunk.DownstreamPolicy);
- UpstreamPayloadRequests.push_back(&Chunk.Upstream);
- }
+ }
+ if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote))
+ {
+ Request->Key->Policy = ConvertToUpstream(Request->DownstreamPolicy);
+ OutUpstreamChunks.push_back(Request->Key);
}
}
- else
+ }
+}
+
+void
+HttpStructuredCacheService::GetLocalCacheValues(std::vector<cache::detail::ChunkRequest*>& ValueRequests,
+ std::vector<CacheChunkRequest*>& OutUpstreamChunks)
+{
+ using namespace cache::detail;
+
+ for (ChunkRequest* Request : ValueRequests)
+ {
+ if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal))
{
- if (Chunk.KeyRequest->Exists)
+ ZenCacheValue CacheValue;
+ if (m_CacheStore.Get(Request->Key->Key.Bucket, Request->Key->Key.Hash, CacheValue))
{
- if (Chunk.KeyRequest->CacheValue && IsCompressedBinary(Chunk.KeyRequest->CacheValue.GetContentType()))
+ if (IsCompressedBinary(CacheValue.Value.GetContentType()))
{
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk.KeyRequest->CacheValue));
- if (Compressed)
+ CompressedBuffer Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value));
+ if (Result)
{
- if (!EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData))
+ if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData))
{
- Chunk.Value = Compressed;
+ Request->Value = Result;
}
- Chunk.Exists = true;
- Chunk.TotalSize = Compressed.GetRawSize();
- Chunk.TotalSizeKnown = true;
- Chunk.Source = Chunk.KeyRequest->Source;
- Chunk.Upstream.ChunkId = IoHash::FromBLAKE3(Compressed.GetRawHash());
+ Request->Key->ChunkId = IoHash::FromBLAKE3(Result.GetRawHash());
+ Request->Exists = true;
+ Request->TotalSize = Result.GetRawSize();
+ Request->TotalSizeKnown = true;
+ Request->Source = "LOCAL"sv;
}
}
}
}
+ if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote))
+ {
+ if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::StoreLocal))
+ {
+ // Convert the Offset,Size request into a request for the entire value; we will need it all to be able to store it locally
+ Request->Key->RawOffset = 0;
+ Request->Key->RawSize = UINT64_MAX;
+ }
+ OutUpstreamChunks.push_back(Request->Key);
+ }
}
+}
+
+void
+HttpStructuredCacheService::GetUpstreamCacheChunks(std::vector<CacheChunkRequest*>& UpstreamChunks,
+ std::vector<CacheChunkRequest>& RequestKeys,
+ std::vector<cache::detail::ChunkRequest>& Requests)
+{
+ using namespace cache::detail;
- if (!UpstreamPayloadRequests.empty())
+ if (!UpstreamChunks.empty())
{
- const auto OnCacheValueGetComplete = [this](CacheValueGetCompleteParams&& Params) {
+ const auto OnCacheValueGetComplete = [this, &RequestKeys, &Requests](CacheValueGetCompleteParams&& Params) {
if (Params.RawHash == Params.RawHash.Zero)
{
return;
}
- ChunkRequestData& Chunk =
- *reinterpret_cast<ChunkRequestData*>(reinterpret_cast<char*>(&Params.Request) - offsetof(ChunkRequestData, Upstream));
- if (EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::StoreLocal) ||
- !EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData))
+ CacheChunkRequest& Key = Params.Request;
+ size_t RequestIndex = std::distance(RequestKeys.data(), &Key);
+ ChunkRequest& Request = Requests[RequestIndex];
+ if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) ||
+ !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData))
{
CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value));
- if (!Compressed || Compressed.GetRawSize() != Params.RawSize)
+ if (!Compressed || Compressed.GetRawSize() != Params.RawSize ||
+ IoHash::FromBLAKE3(Compressed.GetRawHash()) != Params.RawHash)
{
return;
}
- if (EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::StoreLocal))
+ if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal))
{
- m_CidStore.AddChunk(Compressed);
+ if (Request.IsRecordRequest)
+ {
+ m_CidStore.AddChunk(Compressed);
+ }
+ else
+ {
+ m_CacheStore.Put(Key.Key.Bucket, Key.Key.Hash, {.Value = Params.Value});
+ }
}
- if (!EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData))
+ if (!EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData))
{
- Chunk.Value = std::move(Compressed);
+ Request.Value = std::move(Compressed);
}
}
- Chunk.Exists = true;
- Chunk.TotalSize = Params.RawSize;
- Chunk.TotalSizeKnown = true;
- Chunk.Source = "UPSTREAM"sv;
+ Key.ChunkId = Params.RawHash;
+ Request.Exists = true;
+ Request.TotalSize = Params.RawSize;
+ Request.TotalSizeKnown = true;
+ Request.Source = "UPSTREAM"sv;
m_CacheStats.UpstreamHitCount++;
};
- m_UpstreamCache.GetCacheValues(UpstreamPayloadRequests, std::move(OnCacheValueGetComplete));
+ m_UpstreamCache.GetCacheValues(UpstreamChunks, std::move(OnCacheValueGetComplete));
}
}
void
-HttpStructuredCacheService::GetCacheChunks_SendResults(std::vector<GetCacheChunks::detail::ChunkRequestData>& Chunks,
- zen::HttpServerRequest& HttpRequest)
+HttpStructuredCacheService::WriteGetCacheChunksResponse(std::vector<cache::detail::ChunkRequest>& Requests,
+ zen::HttpServerRequest& HttpRequest)
{
- using namespace GetCacheChunks::detail;
+ using namespace cache::detail;
CbPackage RpcResponse;
CbObjectWriter Writer;
Writer.BeginArray("Result"sv);
- for (ChunkRequestData& Chunk : Chunks)
+ for (ChunkRequest& Request : Requests)
{
Writer.BeginObject();
{
- if (Chunk.Exists)
+ if (Request.Exists)
{
- Writer.AddHash("RawHash"sv, Chunk.Upstream.ChunkId);
- if (Chunk.Value && !EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData))
+ Writer.AddHash("RawHash"sv, Request.Key->ChunkId);
+ if (Request.Value && !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData))
{
- RpcResponse.AddAttachment(CbAttachment(Chunk.Value));
+ RpcResponse.AddAttachment(CbAttachment(Request.Value));
}
else
{
- Writer.AddInteger("RawSize"sv, Chunk.TotalSize);
+ Writer.AddInteger("RawSize"sv, Request.TotalSize);
}
- ZEN_DEBUG("CHUNKHIT - '{}/{}/{}' {} '{}' ({})",
- Chunk.Upstream.Key.Bucket,
- Chunk.Upstream.Key.Hash,
- Chunk.Upstream.ValueId,
- NiceBytes(Chunk.TotalSize),
- Chunk.IsRecordRequest ? "Record"sv : "Value"sv,
- Chunk.Source);
+ ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})",
+ Request.Key->Key.Bucket,
+ Request.Key->Key.Hash,
+ Request.Key->ValueId,
+ NiceBytes(Request.TotalSize),
+ Request.IsRecordRequest ? "Record"sv : "Value"sv,
+ Request.Source);
m_CacheStats.HitCount++;
}
- else if (!EnumHasAnyFlags(Chunk.DownstreamPolicy, CachePolicy::Query))
+ else if (!EnumHasAnyFlags(Request.DownstreamPolicy, CachePolicy::Query))
{
- ZEN_DEBUG("CHUNKSKIP - '{}/{}/{}'", Chunk.Upstream.Key.Bucket, Chunk.Upstream.Key.Hash, Chunk.Upstream.ValueId);
+ ZEN_DEBUG("SKIP - '{}/{}/{}'", Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId);
}
else
{
- ZEN_DEBUG("MISS - '{}/{}/{}'", Chunk.Upstream.Key.Bucket, Chunk.Upstream.Key.Hash, Chunk.Upstream.ValueId);
+ ZEN_DEBUG("MISS - '{}/{}/{}'", Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId);
m_CacheStats.MissCount++;
}
}