diff options
Diffstat (limited to 'zenserver')
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 564 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.h | 19 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 138 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.h | 24 | ||||
| -rw-r--r-- | zenserver/upstream/zen.cpp | 8 | ||||
| -rw-r--r-- | zenserver/upstream/zen.h | 4 | ||||
| -rw-r--r-- | zenserver/zenserver.cpp | 31 |
7 files changed, 531 insertions, 257 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 8854ff3d1..facb29e1d 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -15,6 +15,7 @@ #include <zencore/timer.h> #include <zencore/trace.h> #include <zenhttp/httpserver.h> +#include <zenhttp/httpshared.h> #include <zenstore/cas.h> #include <zenutil/cache/cache.h> @@ -55,6 +56,13 @@ struct AttachmentCount uint32_t Total = 0; }; +struct PutRequestData +{ + CacheKey Key; + CbObjectView RecordObject; + CacheRecordPolicy Policy; +}; + ////////////////////////////////////////////////////////////////////////// HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore, @@ -134,13 +142,13 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) CachePolicy PolicyFromURL = ParseCachePolicy(Request.GetQueryParams()); - if (Ref.PayloadId == IoHash::Zero) + if (Ref.ValueContentId == IoHash::Zero) { return HandleCacheRecordRequest(Request, Ref, PolicyFromURL); } else { - return HandleCachePayloadRequest(Request, Ref, PolicyFromURL); + return HandleCacheValueRequest(Request, Ref, PolicyFromURL); } return; @@ -202,12 +210,15 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request const ZenContentType AcceptType = Request.AcceptContentType(); const bool SkipData = EnumHasAllFlags(PolicyFromURL, CachePolicy::SkipData); const bool PartialRecord = EnumHasAllFlags(PolicyFromURL, CachePolicy::PartialRecord); - const bool QueryUpstream = EnumHasAllFlags(PolicyFromURL, CachePolicy::QueryRemote); bool Success = false; - ZenCacheValue LocalCacheValue; + ZenCacheValue ClientResultValue; + if (!EnumHasAnyFlags(PolicyFromURL, CachePolicy::Query)) + { + return Request.WriteResponse(HttpResponseCode::OK); + } - if (m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, LocalCacheValue)) + if (EnumHasAllFlags(PolicyFromURL, CachePolicy::QueryLocal) && m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, ClientResultValue)) { Success = true; @@ -216,15 +227,22 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request CbPackage Package; uint32_t MissingCount = 0; - CbObjectView CacheRecord(LocalCacheValue.Value.Data()); - CacheRecord.IterateAttachments([this, &MissingCount, &Package](CbFieldView AttachmentHash) { - if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) + CbObjectView CacheRecord(ClientResultValue.Value.Data()); + CacheRecord.IterateAttachments([this, &MissingCount, &Package, SkipData](CbFieldView AttachmentHash) { + if (SkipData) { - Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); + MissingCount += m_CidStore.ContainsChunk(AttachmentHash.AsHash()) ? 0 : 1; } else { - MissingCount++; + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) + { + Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); + } + else + { + MissingCount++; + } } }); @@ -232,13 +250,13 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request if (Success) { - Package.SetObject(LoadCompactBinaryObject(LocalCacheValue.Value)); + Package.SetObject(LoadCompactBinaryObject(ClientResultValue.Value)); BinaryWriter MemStream; Package.Save(MemStream); - LocalCacheValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); - LocalCacheValue.Value.SetContentType(HttpContentType::kCbPackage); + ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); + ClientResultValue.Value.SetContentType(HttpContentType::kCbPackage); } } } @@ -248,21 +266,21 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL)", Ref.BucketSegment, Ref.HashKey, - NiceBytes(LocalCacheValue.Value.Size()), - ToString(LocalCacheValue.Value.GetContentType())); + NiceBytes(ClientResultValue.Value.Size()), + ToString(ClientResultValue.Value.GetContentType())); m_CacheStats.HitCount++; - - if (SkipData) + if (SkipData && AcceptType == ZenContentType::kBinary) { return Request.WriteResponse(HttpResponseCode::OK); } else { - return Request.WriteResponse(HttpResponseCode::OK, LocalCacheValue.Value.GetContentType(), LocalCacheValue.Value); + // Other types handled SkipData when constructing the ClientResultValue + return Request.WriteResponse(HttpResponseCode::OK, ClientResultValue.Value.GetContentType(), ClientResultValue.Value); } } - else if (!QueryUpstream) + else if (!EnumHasAllFlags(PolicyFromURL, CachePolicy::QueryRemote)) { ZEN_DEBUG("MISS - '{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); m_CacheStats.MissCount++; @@ -272,9 +290,13 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request // Issue upstream query asynchronously in order to keep requests flowing without // hogging I/O servicing threads with blocking work - Request.WriteResponseAsync([this, AcceptType, SkipData, PartialRecord, Ref](HttpServerRequest& AsyncRequest) { - bool Success = false; - ZenCacheValue UpstreamCacheValue; + Request.WriteResponseAsync([this, AcceptType, PolicyFromURL, Ref](HttpServerRequest& AsyncRequest) { + bool Success = false; + const bool PartialRecord = EnumHasAllFlags(PolicyFromURL, CachePolicy::PartialRecord); + const bool QueryLocal = EnumHasAllFlags(PolicyFromURL, CachePolicy::QueryLocal); + const bool StoreLocal = EnumHasAllFlags(PolicyFromURL, CachePolicy::StoreLocal); + const bool SkipData = EnumHasAllFlags(PolicyFromURL, CachePolicy::SkipData); + ZenCacheValue ClientResultValue; metrics::OperationTiming::Scope $(m_UpstreamGetRequestTiming); @@ -283,8 +305,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request { Success = true; - UpstreamCacheValue.Value = UpstreamResult.Value; - UpstreamCacheValue.Value.SetContentType(AcceptType); + ClientResultValue.Value = UpstreamResult.Value; + ClientResultValue.Value.SetContentType(AcceptType); if (AcceptType == ZenContentType::kBinary || AcceptType == ZenContentType::kCbObject) { @@ -299,30 +321,35 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request Ref.HashKey, ToString(AcceptType)); } + + // We do not do anything to the returned object for SkipData, only package attachments are cut when skipping data } - if (Success) + if (Success && StoreLocal) { - m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, UpstreamCacheValue); + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, ClientResultValue); } } else if (AcceptType == ZenContentType::kCbPackage) { CbPackage Package; - if (Package.TryLoad(UpstreamCacheValue.Value)) + if (Package.TryLoad(ClientResultValue.Value)) { CbObject CacheRecord = Package.GetObject(); AttachmentCount Count; - CacheRecord.IterateAttachments([this, &Package, &Ref, &Count](CbFieldView HashView) { + CacheRecord.IterateAttachments([this, &Package, &Ref, &Count, QueryLocal, StoreLocal](CbFieldView HashView) { if (const CbAttachment* Attachment = Package.FindAttachment(HashView.AsHash())) { if (CompressedBuffer Compressed = Attachment->AsCompressedBinary()) { - auto InsertResult = m_CidStore.AddChunk(Compressed); - if (InsertResult.New) + if (StoreLocal) { - Count.New++; + auto InsertResult = m_CidStore.AddChunk(Compressed); + if (InsertResult.New) + { + Count.New++; + } } Count.Valid++; } @@ -335,10 +362,13 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request Count.Invalid++; } } - else if (IoBuffer Chunk = m_CidStore.FindChunkByCid(HashView.AsHash())) + else if (QueryLocal) { - Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); - Count.Valid++; + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(HashView.AsHash())) + { + Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); + Count.Valid++; + } } Count.Total++; }); @@ -349,13 +379,24 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer(); CacheValue.Value.SetContentType(ZenContentType::kCbObject); - m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue); + if (StoreLocal) + { + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue); + } BinaryWriter MemStream; - Package.Save(MemStream); + if (SkipData) + { + // Save a package containing only the object. + CbPackage(Package.GetObject()).Save(MemStream); + } + else + { + Package.Save(MemStream); + } - UpstreamCacheValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); - UpstreamCacheValue.Value.SetContentType(ZenContentType::kCbPackage); + ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); + ClientResultValue.Value.SetContentType(ZenContentType::kCbPackage); } else { @@ -379,19 +420,21 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request ZEN_DEBUG("HIT - '{}/{}' {} '{}' (UPSTREAM)", Ref.BucketSegment, Ref.HashKey, - NiceBytes(UpstreamCacheValue.Value.Size()), - ToString(UpstreamCacheValue.Value.GetContentType())); + NiceBytes(ClientResultValue.Value.Size()), + ToString(ClientResultValue.Value.GetContentType())); m_CacheStats.HitCount++; m_CacheStats.UpstreamHitCount++; - if (SkipData) + if (SkipData && AcceptType == ZenContentType::kBinary) { AsyncRequest.WriteResponse(HttpResponseCode::OK); } else { - AsyncRequest.WriteResponse(HttpResponseCode::OK, UpstreamCacheValue.Value.GetContentType(), UpstreamCacheValue.Value); + // Other methods modify ClientResultValue to a version that has skipped the data but keeps the Object and optionally + // metadata. + AsyncRequest.WriteResponse(HttpResponseCode::OK, ClientResultValue.Value.GetContentType(), ClientResultValue.Value); } } else @@ -417,14 +460,14 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request Body.SetContentType(ContentType); - if (ContentType == HttpContentType::kBinary) + if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kCompressedBinary) { ZEN_DEBUG("PUT - '{}/{}' {} '{}'", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size()), ToString(ContentType)); m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body}); if (EnumHasAllFlags(PolicyFromURL, CachePolicy::StoreRemote)) { - m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kBinary, .Key = {Ref.BucketSegment, Ref.HashKey}}); + m_UpstreamCache.EnqueueUpstream({.Type = ContentType, .Key = {Ref.BucketSegment, Ref.HashKey}}); } Request.WriteResponse(HttpResponseCode::Created); @@ -468,8 +511,9 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) { - m_UpstreamCache.EnqueueUpstream( - {.Type = ZenContentType::kCbObject, .Key = {Ref.BucketSegment, Ref.HashKey}, .PayloadIds = std::move(ValidAttachments)}); + m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbObject, + .Key = {Ref.BucketSegment, Ref.HashKey}, + .ValueContentIds = std::move(ValidAttachments)}); } Request.WriteResponse(HttpResponseCode::Created); @@ -550,8 +594,9 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) { - m_UpstreamCache.EnqueueUpstream( - {.Type = ZenContentType::kCbPackage, .Key = {Ref.BucketSegment, Ref.HashKey}, .PayloadIds = std::move(ValidAttachments)}); + m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage, + .Key = {Ref.BucketSegment, Ref.HashKey}, + .ValueContentIds = std::move(ValidAttachments)}); } Request.WriteResponse(HttpResponseCode::Created); @@ -563,16 +608,16 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request } void -HttpStructuredCacheService::HandleCachePayloadRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) +HttpStructuredCacheService::HandleCacheValueRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) { switch (Request.RequestVerb()) { case HttpVerb::kHead: case HttpVerb::kGet: - HandleGetCachePayload(Request, Ref, PolicyFromURL); + HandleGetCacheValue(Request, Ref, PolicyFromURL); break; case HttpVerb::kPut: - HandlePutCachePayload(Request, Ref, PolicyFromURL); + HandlePutCacheValue(Request, Ref, PolicyFromURL); break; default: break; @@ -580,16 +625,17 @@ HttpStructuredCacheService::HandleCachePayloadRequest(HttpServerRequest& Request } void -HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) +HttpStructuredCacheService::HandleGetCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) { - IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId); + IoBuffer Value = m_CidStore.FindChunkByCid(Ref.ValueContentId); bool InUpstreamCache = false; CachePolicy Policy = PolicyFromURL; - const bool QueryUpstream = !Payload && EnumHasAllFlags(Policy, CachePolicy::QueryRemote); + const bool QueryUpstream = !Value && EnumHasAllFlags(Policy, CachePolicy::QueryRemote); if (QueryUpstream) { - if (auto UpstreamResult = m_UpstreamCache.GetCachePayload({Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId); UpstreamResult.Success) + if (auto UpstreamResult = m_UpstreamCache.GetCacheValue({Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId); + UpstreamResult.Success) { if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) { @@ -603,9 +649,9 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques } } - if (!Payload) + if (!Value) { - ZEN_DEBUG("MISS - '{}/{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId, ToString(Request.AcceptContentType())); + ZEN_DEBUG("MISS - '{}/{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, Ref.ValueContentId, ToString(Request.AcceptContentType())); m_CacheStats.MissCount++; return Request.WriteResponse(HttpResponseCode::NotFound); } @@ -613,9 +659,9 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})", Ref.BucketSegment, Ref.HashKey, - Ref.PayloadId, - NiceBytes(Payload.Size()), - ToString(Payload.GetContentType()), + Ref.ValueContentId, + NiceBytes(Value.Size()), + ToString(Value.GetContentType()), InUpstreamCache ? "UPSTREAM" : "LOCAL"); m_CacheStats.HitCount++; @@ -630,12 +676,12 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques } else { - Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Payload); + Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Value); } } void -HttpStructuredCacheService::HandlePutCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) +HttpStructuredCacheService::HandlePutCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) { // Note: Individual cacherecord values are not propagated upstream until a valid cache record has been stored ZEN_UNUSED(PolicyFromURL); @@ -656,9 +702,11 @@ HttpStructuredCacheService::HandlePutCachePayload(zen::HttpServerRequest& Reques return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Attachments must be compressed"sv); } - if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Ref.PayloadId) + if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Ref.ValueContentId) { - return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "ValueId does not match attachment hash"sv); + return Request.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "ValueContentId does not match attachment hash"sv); } CidStore::InsertResult Result = m_CidStore.AddChunk(Compressed); @@ -666,7 +714,7 @@ HttpStructuredCacheService::HandlePutCachePayload(zen::HttpServerRequest& Reques ZEN_DEBUG("PUT - '{}/{}/{}' {} '{}' ({})", Ref.BucketSegment, Ref.HashKey, - Ref.PayloadId, + Ref.ValueContentId, NiceBytes(Body.Size()), ToString(Body.GetContentType()), Result.New ? "NEW" : "OLD"); @@ -695,13 +743,13 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef& } std::string_view HashSegment; - std::string_view PayloadSegment; + std::string_view ValueSegment; - std::string_view::size_type PayloadSplitOffset = Key.find_last_of('/'); + std::string_view::size_type ValueSplitOffset = Key.find_last_of('/'); // We know there is a slash so no need to check for npos return - if (PayloadSplitOffset == BucketSplitOffset) + if (ValueSplitOffset == BucketSplitOffset) { // Basic cache record lookup HashSegment = Key.substr(BucketSplitOffset + 1); @@ -709,8 +757,8 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef& else { // Cache record + valueid lookup - HashSegment = Key.substr(BucketSplitOffset + 1, PayloadSplitOffset - BucketSplitOffset - 1); - PayloadSegment = Key.substr(PayloadSplitOffset + 1); + HashSegment = Key.substr(BucketSplitOffset + 1, ValueSplitOffset - BucketSplitOffset - 1); + ValueSegment = Key.substr(ValueSplitOffset + 1); } if (HashSegment.size() != IoHash::StringLength) @@ -718,9 +766,9 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef& return false; } - if (!PayloadSegment.empty() && PayloadSegment.size() == IoHash::StringLength) + if (!ValueSegment.empty() && ValueSegment.size() == IoHash::StringLength) { - const bool IsOk = ParseHexBytes(PayloadSegment.data(), PayloadSegment.size(), OutRef.PayloadId.Hash); + const bool IsOk = ParseHexBytes(ValueSegment.data(), ValueSegment.size(), OutRef.ValueContentId.Hash); if (!IsOk) { @@ -729,7 +777,7 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef& } else { - OutRef.PayloadId = IoHash::Zero; + OutRef.ValueContentId = IoHash::Zero; } const bool IsOk = ParseHexBytes(HashSegment.data(), HashSegment.size(), OutRef.HashKey.Hash); @@ -752,27 +800,44 @@ HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request) const HttpContentType ContentType = Request.RequestContentType(); const HttpContentType AcceptType = Request.AcceptContentType(); - if (ContentType != HttpContentType::kCbObject || AcceptType != HttpContentType::kCbPackage) + if ((ContentType != HttpContentType::kCbObject && ContentType != HttpContentType::kCbPackage) || + AcceptType != HttpContentType::kCbPackage) { return Request.WriteResponse(HttpResponseCode::BadRequest); } - Request.WriteResponseAsync( - [this, RpcRequest = zen::LoadCompactBinaryObject(Request.ReadPayload())](HttpServerRequest& AsyncRequest) { - const std::string_view Method = RpcRequest["Method"sv].AsString(); - if (Method == "GetCacheRecords"sv) - { - HandleRpcGetCacheRecords(AsyncRequest, RpcRequest); - } - else if (Method == "GetCacheValues"sv) - { - HandleRpcGetCachePayloads(AsyncRequest, RpcRequest); - } - else - { - AsyncRequest.WriteResponse(HttpResponseCode::BadRequest); - } - }); + Request.WriteResponseAsync([this, Body = Request.ReadPayload(), ContentType](HttpServerRequest& AsyncRequest) mutable { + CbPackage Package; + CbObjectView Object; + CbObject ObjectBuffer; + if (ContentType == HttpContentType::kCbObject) + { + ObjectBuffer = zen::LoadCompactBinaryObject(std::move(Body)); + Object = ObjectBuffer; + } + else + { + Package = ParsePackageMessage(Body); + Object = Package.GetObject(); + } + const std::string_view Method = Object["Method"sv].AsString(); + if (Method == "PutCacheRecords"sv) + { + HandleRpcPutCacheRecords(AsyncRequest, Package); + } + else if (Method == "GetCacheRecords"sv) + { + HandleRpcGetCacheRecords(AsyncRequest, Object); + } + else if (Method == "GetCacheValues"sv) + { + HandleRpcGetCacheValues(AsyncRequest, Object); + } + else + { + AsyncRequest.WriteResponse(HttpResponseCode::BadRequest); + } + }); } break; default: @@ -782,24 +847,154 @@ HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request) } void +HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Request, const CbPackage& BatchRequest) +{ + ZEN_TRACE_CPU("Z$::RpcPutCacheRecords"); + CbObjectView BatchObject = BatchRequest.GetObject(); + + CbObjectView Params = BatchObject["Params"sv].AsObjectView(); + CachePolicy DefaultPolicy; + + ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheRecords"sv); + + std::string_view PolicyText = Params["DefaultPolicy"].AsString(); + DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; + std::vector<bool> Results; + for (CbFieldView RequestField : Params["Requests"sv]) + { + CbObjectView RequestObject = RequestField.AsObjectView(); + CbObjectView RecordObject = RequestObject["Record"sv].AsObjectView(); + CbObjectView KeyView = RecordObject["Key"sv].AsObjectView(); + CbFieldView BucketField = KeyView["Bucket"sv]; + CbFieldView HashField = KeyView["Hash"sv]; + CacheKey Key = CacheKey::Create(BucketField.AsString(), HashField.AsHash()); + if (BucketField.HasError() || HashField.HasError() || Key.Bucket.empty()) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } + CacheRecordPolicy Policy = CacheRecordPolicy::Load(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy); + PutRequestData PutRequest{std::move(Key), RecordObject, std::move(Policy)}; + + PutResult Result = PutCacheRecord(PutRequest, &BatchRequest); + + if (Result == PutResult::Invalid) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } + Results.push_back(Result == PutResult::Success); + } + if (Results.empty()) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } + + CbObjectWriter ResponseObject; + ResponseObject.BeginArray("Result"sv); + for (bool Value : Results) + { + ResponseObject.AddBool(Value); + } + ResponseObject.EndArray(); + + CbPackage RpcResponse; + RpcResponse.SetObject(ResponseObject.Save()); + + BinaryWriter MemStream; + RpcResponse.Save(MemStream); + + Request.WriteResponse(HttpResponseCode::OK, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); +} + +HttpStructuredCacheService::PutResult +HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPackage* Package) +{ + std::vector<IoHash> ValidAttachments; + AttachmentCount Count; + CbObjectView Record = Request.RecordObject; + uint64_t RecordObjectSize = Record.GetSize(); + uint64_t TransferredSize = RecordObjectSize; + + Request.RecordObject.IterateAttachments([this, &Request, Package, &ValidAttachments, &Count, &TransferredSize](CbFieldView HashView) { + const IoHash ValueHash = HashView.AsHash(); + if (const CbAttachment* Attachment = Package ? Package->FindAttachment(ValueHash) : nullptr) + { + if (Attachment->IsCompressedBinary()) + { + CompressedBuffer Chunk = Attachment->AsCompressedBinary(); + CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk); + + ValidAttachments.emplace_back(InsertResult.DecompressedId); + + if (InsertResult.New) + { + Count.New++; + } + Count.Valid++; + TransferredSize += Chunk.GetCompressedSize(); + } + else + { + ZEN_WARN("PUT - '{}/{}' '{}' FAILED, attachment '{}' is not compressed", + Request.Key.Bucket, + Request.Key.Hash, + ToString(HttpContentType::kCbPackage), + ValueHash); + Count.Invalid++; + } + } + else if (m_CidStore.ContainsChunk(ValueHash)) + { + ValidAttachments.emplace_back(ValueHash); + Count.Valid++; + } + Count.Total++; + }); + + if (Count.Invalid > 0) + { + return PutResult::Invalid; + } + + ZEN_DEBUG("PUT - '{}/{}' {}, attachments '{}/{}/{}' (new/valid/total)", + Request.Key.Bucket, + Request.Key.Hash, + NiceBytes(TransferredSize), + Count.New, + Count.Valid, + Count.Total); + + ZenCacheValue CacheValue; + CacheValue.Value = IoBuffer(Record.GetSize()); + Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize())); + CacheValue.Value.SetContentType(ZenContentType::kCbObject); + m_CacheStore.Put(Request.Key.Bucket, Request.Key.Hash, CacheValue); + + const bool IsPartialRecord = Count.Valid != Count.Total; + + if (EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord) + { + m_UpstreamCache.EnqueueUpstream( + {.Type = ZenContentType::kCbPackage, .Key = Request.Key, .ValueContentIds = std::move(ValidAttachments)}); + } + return PutResult::Success; +} + +void HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView RpcRequest) { ZEN_TRACE_CPU("Z$::RpcGetCacheRecords"); CbPackage RpcResponse; - CacheRecordPolicy Policy; - CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); + CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); + CacheRecordPolicy BatchPolicy = CacheRecordPolicy::Load(Params["Policy"sv].AsObjectView()); std::vector<CacheKey> CacheKeys; std::vector<IoBuffer> CacheValues; std::vector<size_t> UpstreamRequests; ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheRecords"sv); - Policy = CacheRecordPolicy::Load(Params["Policy"sv].AsObjectView()); - - const bool PartialRecord = EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::PartialRecord); - const bool QueryRemote = EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryRemote); - for (CbFieldView KeyView : Params["CacheKeys"sv]) { CbObjectView KeyObject = KeyView.AsObjectView(); @@ -816,44 +1011,84 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req for (size_t KeyIndex = 0; const CacheKey& Key : CacheKeys) { ZenCacheValue CacheValue; - uint32_t MissingCount = 0; + uint32_t MissingCount = 0; + uint32_t MissingReadFromUpstreamCount = 0; - if (m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue)) + if (EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::QueryLocal) && m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue)) { CbObjectView CacheRecord(CacheValue.Value.Data()); - CacheRecord.IterateAttachments([this, &MissingCount, &RpcResponse](CbFieldView AttachmentHash) { - if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) - { - ZEN_ASSERT(Chunk.GetSize() > 0); - RpcResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); - } - else - { - MissingCount++; - } - }); + CacheRecord.IterateAttachments( + [this, &MissingCount, &MissingReadFromUpstreamCount, &RpcResponse, &BatchPolicy](CbFieldView AttachmentHash) { + CachePolicy ValuePolicy = BatchPolicy.GetRecordPolicy(); + if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryLocal)) + { + // A value that is requested without the Query flag (such as None/Disable) does not count as missing, because we + // didn't ask for it and thus the record is complete in its absence. + if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) + { + MissingReadFromUpstreamCount++; + MissingCount++; + } + } + else if (EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) + { + if (!m_CidStore.ContainsChunk(AttachmentHash.AsHash())) + { + if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) + { + MissingReadFromUpstreamCount++; + } + MissingCount++; + } + } + else + { + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) + { + ZEN_ASSERT(Chunk.GetSize() > 0); + RpcResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); + } + else + { + if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) + { + MissingReadFromUpstreamCount++; + } + MissingCount++; + } + } + }); } - if (CacheValue.Value && (MissingCount == 0 || PartialRecord)) + if ((!CacheValue.Value && EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::QueryRemote)) || + MissingReadFromUpstreamCount != 0) + { + UpstreamRequests.push_back(KeyIndex); + } + else if (CacheValue.Value && (MissingCount == 0 || EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::PartialRecord))) { ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL) {}", Key.Bucket, Key.Hash, NiceBytes(CacheValue.Value.Size()), ToString(CacheValue.Value.GetContentType()), - MissingCount ? "(PARTIAl)" : ""sv); + MissingCount ? "(PARTIAL)" : ""sv); CacheValues[KeyIndex] = std::move(CacheValue.Value); m_CacheStats.HitCount++; } - else if (QueryRemote) - { - UpstreamRequests.push_back(KeyIndex); - } else { - ZEN_DEBUG("MISS - '{}/{}' {}", Key.Bucket, Key.Hash, MissingCount ? "(PARTIAl)"sv : ""sv); - m_CacheStats.MissCount++; + if (!EnumHasAnyFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::Query)) + { + // If they requested no query, do not record this as a miss + ZEN_DEBUG("DISABLEDQUERY - '{}/{}'", Key.Bucket, Key.Hash); + } + else + { + ZEN_DEBUG("MISS - '{}/{}' {}", Key.Bucket, Key.Hash, MissingCount ? "(PARTIAL)"sv : ""sv); + m_CacheStats.MissCount++; + } } ++KeyIndex; @@ -861,7 +1096,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req if (!UpstreamRequests.empty()) { - const auto OnCacheRecordGetComplete = [this, &CacheValues, &RpcResponse, PartialRecord](CacheRecordGetCompleteParams&& Params) { + const auto OnCacheRecordGetComplete = [this, &CacheValues, &RpcResponse, &BatchPolicy](CacheRecordGetCompleteParams&& Params) { ZEN_ASSERT(Params.KeyIndex < CacheValues.size()); IoBuffer CacheValue; @@ -869,37 +1104,52 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req if (Params.Record) { - Params.Record.IterateAttachments([this, &RpcResponse, &Params, &Count](CbFieldView HashView) { - if (const CbAttachment* Attachment = Params.Package.FindAttachment(HashView.AsHash())) + Params.Record.IterateAttachments([this, &RpcResponse, &Params, &Count, &BatchPolicy](CbFieldView HashView) { + CachePolicy ValuePolicy = BatchPolicy.GetRecordPolicy(); + bool FoundInUpstream = false; + if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) { - if (CompressedBuffer Compressed = Attachment->AsCompressedBinary()) + if (const CbAttachment* Attachment = Params.Package.FindAttachment(HashView.AsHash())) { - auto InsertResult = m_CidStore.AddChunk(Compressed); - if (InsertResult.New) + FoundInUpstream = true; + if (CompressedBuffer Compressed = Attachment->AsCompressedBinary()) { - Count.New++; - } - Count.Valid++; + FoundInUpstream = true; + if (EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal)) + { + auto InsertResult = m_CidStore.AddChunk(Compressed); + if (InsertResult.New) + { + Count.New++; + } + } + Count.Valid++; - RpcResponse.AddAttachment(CbAttachment(Compressed)); - } - else - { - ZEN_DEBUG("Uncompressed value '{}' from upstream cache record '{}/{}'", - HashView.AsHash(), - Params.Key.Bucket, - Params.Key.Hash); - Count.Invalid++; + if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) + { + RpcResponse.AddAttachment(CbAttachment(Compressed)); + } + } + else + { + ZEN_DEBUG("Uncompressed value '{}' from upstream cache record '{}/{}'", + HashView.AsHash(), + Params.Key.Bucket, + Params.Key.Hash); + Count.Invalid++; + } } } - else if (m_CidStore.ContainsChunk(HashView.AsHash())) + if (!FoundInUpstream && EnumHasAllFlags(ValuePolicy, CachePolicy::QueryLocal) && + m_CidStore.ContainsChunk(HashView.AsHash())) { + // We added the attachment for this Value in the local loop before calling m_UpstreamCache Count.Valid++; } Count.Total++; }); - if ((Count.Valid == Count.Total) || PartialRecord) + if ((Count.Valid == Count.Total) || EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::PartialRecord)) { CacheValue = CbObject::Clone(Params.Record).GetBuffer().AsIoBuffer(); } @@ -917,9 +1167,11 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req Count.Total); CacheValue.SetContentType(ZenContentType::kCbObject); - CacheValues[Params.KeyIndex] = CacheValue; - m_CacheStore.Put(Params.Key.Bucket, Params.Key.Hash, {.Value = CacheValue}); + if (EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::StoreLocal)) + { + m_CacheStore.Put(Params.Key.Bucket, Params.Key.Hash, {.Value = CacheValue}); + } m_CacheStats.HitCount++; m_CacheStats.UpstreamHitCount++; @@ -932,7 +1184,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req } }; - m_UpstreamCache.GetCacheRecords(CacheKeys, UpstreamRequests, Policy, std::move(OnCacheRecordGetComplete)); + m_UpstreamCache.GetCacheRecords(CacheKeys, UpstreamRequests, BatchPolicy, std::move(OnCacheRecordGetComplete)); } CbObjectWriter ResponseObject; @@ -963,9 +1215,9 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req } void -HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Request, CbObjectView RpcRequest) +HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Request, CbObjectView RpcRequest) { - ZEN_TRACE_CPU("Z$::RpcGetCachePayloads"); + ZEN_TRACE_CPU("Z$::RpcGetCacheValues"); ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheValues"sv); @@ -980,7 +1232,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); const CacheKey Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash()); const IoHash ChunkId = RequestObject["ChunkId"sv].AsHash(); - const Oid PayloadId = RequestObject["ValueId"sv].AsObjectId(); + const Oid ValueId = RequestObject["ValueId"sv].AsObjectId(); const uint64_t RawOffset = RequestObject["RawOffset"sv].AsUInt64(); const uint64_t RawSize = RequestObject["RawSize"sv].AsUInt64(); std::string_view PolicyText = RequestObject["Policy"sv].AsString(); @@ -989,7 +1241,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re // Note we could use emplace_back here but [Apple] LLVM-12's C++ library // can't infer a constructor like other platforms (or can't handle an // initializer list like others do). - ChunkRequests.push_back({Key, ChunkId, PayloadId, RawOffset, RawSize, ChunkPolicy}); + ChunkRequests.push_back({Key, ChunkId, ValueId, RawOffset, RawSize, ChunkPolicy}); } if (ChunkRequests.empty()) @@ -1002,8 +1254,8 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re // Unreal uses a 12 byte ID to address cache record values. When the uncompressed hash (ChunkId) // is missing, load the cache record and try to find the raw hash from the ValueId. { - const auto GetChunkIdFromPayloadId = [](CbObjectView Record, const Oid& PayloadId) -> IoHash { - if (PayloadId) + const auto GetChunkIdFromValueId = [](CbObjectView Record, const Oid& ValueId) -> IoHash { + if (ValueId) { // A valid ValueId indicates that the caller is searching for a Value in a Record // that was Put with ICacheStore::Put @@ -1012,7 +1264,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re CbObjectView ValueObject = ValueView.AsObjectView(); const Oid Id = ValueObject["Id"sv].AsObjectId(); - if (Id == PayloadId) + if (Id == ValueId) { return ValueObject["RawHash"sv].AsHash(); } @@ -1022,7 +1274,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re if (CbObjectView ValueObject = Record["Value"sv].AsObjectView()) { const Oid Id = ValueObject["Id"sv].AsObjectId(); - if (Id == PayloadId) + if (Id == ValueId) { return ValueObject["RawHash"sv].AsHash(); } @@ -1033,7 +1285,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re CbObjectView AttachmentObject = AttachmentView.AsObjectView(); const Oid Id = AttachmentObject["Id"sv].AsObjectId(); - if (Id == PayloadId) + if (Id == ValueId) { return AttachmentObject["RawHash"sv].AsHash(); } @@ -1071,7 +1323,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re if (CurrentRecordBuffer) { - ChunkRequest.ChunkId = GetChunkIdFromPayloadId(CbObjectView(CurrentRecordBuffer.GetData()), ChunkRequest.PayloadId); + ChunkRequest.ChunkId = GetChunkIdFromValueId(CbObjectView(CurrentRecordBuffer.GetData()), ChunkRequest.ValueId); } } } @@ -1118,8 +1370,8 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re if (!UpstreamRequests.empty()) { - const auto OnCachePayloadGetComplete = [this, &Chunks](CachePayloadGetCompleteParams&& Params) { - if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Payload))) + const auto OnCacheValueGetComplete = [this, &Chunks](CacheValueGetCompleteParams&& Params) { + if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value))) { m_CidStore.AddChunk(Compressed); @@ -1127,11 +1379,11 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re Params.Request.Key.Bucket, Params.Request.Key.Hash, Params.Request.ChunkId, - NiceBytes(Params.Payload.GetSize()), + NiceBytes(Params.Value.GetSize()), "UPSTREAM"); ZEN_ASSERT(Params.RequestIndex < Chunks.size()); - Chunks[Params.RequestIndex] = std::move(Params.Payload); + Chunks[Params.RequestIndex] = std::move(Params.Value); m_CacheStats.HitCount++; m_CacheStats.UpstreamHitCount++; @@ -1143,7 +1395,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re } }; - m_UpstreamCache.GetCachePayloads(ChunkRequests, UpstreamRequests, std::move(OnCachePayloadGetComplete)); + m_UpstreamCache.GetCacheValues(ChunkRequests, UpstreamRequests, std::move(OnCacheValueGetComplete)); } CbPackage RpcResponse; diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index a7ecba845..88bf6cda1 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -19,6 +19,7 @@ namespace zen { class CasStore; class CidStore; class CbObjectView; +struct PutRequestData; class ScrubContext; class UpstreamCache; class ZenCacheStore; @@ -73,7 +74,7 @@ private: { std::string BucketSegment; IoHash HashKey; - IoHash PayloadId; + IoHash ValueContentId; }; struct CacheStats @@ -82,20 +83,28 @@ private: std::atomic_uint64_t UpstreamHitCount{}; std::atomic_uint64_t MissCount{}; }; + enum class PutResult + { + Success, + Fail, + Invalid, + }; [[nodiscard]] bool ValidateKeyUri(zen::HttpServerRequest& Request, CacheRef& OutRef); void HandleCacheRecordRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL); void HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl); void HandlePutCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL); - void HandleCachePayloadRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL); - void HandleGetCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL); - void HandlePutCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL); + void HandleCacheValueRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL); + void HandleGetCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL); + void HandlePutCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL); void HandleRpcRequest(zen::HttpServerRequest& Request); + void HandleRpcPutCacheRecords(zen::HttpServerRequest& Request, const CbPackage& BatchRequest); void HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView BatchRequest); - void HandleRpcGetCachePayloads(zen::HttpServerRequest& Request, CbObjectView BatchRequest); + void HandleRpcGetCacheValues(zen::HttpServerRequest& Request, CbObjectView BatchRequest); void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket); virtual void HandleStatsRequest(zen::HttpServerRequest& Request) override; virtual void HandleStatusRequest(zen::HttpServerRequest& Request) override; + PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package); spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 8b02a437a..657cfb729 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -270,14 +270,14 @@ namespace detail { return Result; } - virtual GetUpstreamCacheResult GetCachePayload(const CacheKey&, const IoHash& PayloadId) override + virtual GetUpstreamCacheResult GetCacheValue(const CacheKey&, const IoHash& ValueContentId) override { - ZEN_TRACE_CPU("Upstream::Horde::GetSingleCachePayload"); + ZEN_TRACE_CPU("Upstream::Horde::GetSingleCacheValue"); try { CloudCacheSession Session(m_Client); - const CloudCacheResult Result = Session.GetCompressedBlob(PayloadId); + const CloudCacheResult Result = Session.GetCompressedBlob(ValueContentId); m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -301,11 +301,11 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests, - std::span<size_t> RequestIndex, - OnCachePayloadGetComplete&& OnComplete) override final + virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests, + std::span<size_t> RequestIndex, + OnCacheValueGetComplete&& OnComplete) override final { - ZEN_TRACE_CPU("Upstream::Horde::GetCachePayloads"); + ZEN_TRACE_CPU("Upstream::Horde::GetCacheValues"); CloudCacheSession Session(m_Client); GetUpstreamCacheResult Result; @@ -325,7 +325,7 @@ namespace detail { m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); } - OnComplete({.Request = Request, .RequestIndex = Index, .Payload = Payload}); + OnComplete({.Request = Request, .RequestIndex = Index, .Value = Payload}); } return Result; @@ -333,11 +333,11 @@ namespace detail { virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, IoBuffer RecordValue, - std::span<IoBuffer const> Payloads) override + std::span<IoBuffer const> Values) override { ZEN_TRACE_CPU("Upstream::Horde::PutCacheRecord"); - ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size()); + ZEN_ASSERT(CacheRecord.ValueContentIds.size() == Values.size()); const int32_t MaxAttempts = 3; try @@ -371,30 +371,31 @@ namespace detail { int64_t TotalBytes = 0ull; double TotalElapsedSeconds = 0.0; - const auto PutBlobs = [&](std::span<IoHash> PayloadIds, std::string& OutReason) -> bool { - for (const IoHash& PayloadId : PayloadIds) + const auto PutBlobs = [&](std::span<IoHash> ValueContentIds, std::string& OutReason) -> bool { + for (const IoHash& ValueContentId : ValueContentIds) { - const auto It = std::find(std::begin(CacheRecord.PayloadIds), std::end(CacheRecord.PayloadIds), PayloadId); + const auto It = + std::find(std::begin(CacheRecord.ValueContentIds), std::end(CacheRecord.ValueContentIds), ValueContentId); - if (It == std::end(CacheRecord.PayloadIds)) + if (It == std::end(CacheRecord.ValueContentIds)) { - OutReason = fmt::format("value '{}' MISSING from local cache", PayloadId); + OutReason = fmt::format("value '{}' MISSING from local cache", ValueContentId); return false; } - const size_t Idx = std::distance(std::begin(CacheRecord.PayloadIds), It); + const size_t Idx = std::distance(std::begin(CacheRecord.ValueContentIds), It); CloudCacheResult BlobResult; for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++) { - BlobResult = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]); + BlobResult = Session.PutCompressedBlob(CacheRecord.ValueContentIds[Idx], Values[Idx]); } m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); if (!BlobResult.Success) { - OutReason = fmt::format("upload value '{}' FAILED, reason '{}'", PayloadId, BlobResult.Reason); + OutReason = fmt::format("upload value '{}' FAILED, reason '{}'", ValueContentId, BlobResult.Reason); return false; } @@ -685,14 +686,14 @@ namespace detail { return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; } - virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) override + virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) override { - ZEN_TRACE_CPU("Upstream::Zen::GetSingleCachePayload"); + ZEN_TRACE_CPU("Upstream::Zen::GetSingleCacheValue"); try { ZenStructuredCacheSession Session(*m_Client); - const ZenCacheResult Result = Session.GetCachePayload(CacheKey.Bucket, CacheKey.Hash, PayloadId); + const ZenCacheResult Result = Session.GetCacheValue(CacheKey.Bucket, CacheKey.Hash, ValueContentId); m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -716,11 +717,11 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests, - std::span<size_t> RequestIndex, - OnCachePayloadGetComplete&& OnComplete) override final + virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests, + std::span<size_t> RequestIndex, + OnCacheValueGetComplete&& OnComplete) override final { - ZEN_TRACE_CPU("Upstream::Zen::GetCachePayloads"); + ZEN_TRACE_CPU("Upstream::Zen::GetCacheValues"); std::vector<size_t> IndexMap; IndexMap.reserve(RequestIndex.size()); @@ -744,7 +745,7 @@ namespace detail { BatchRequest << "Bucket"sv << Request.Key.Bucket; BatchRequest << "Hash"sv << Request.Key.Hash; BatchRequest.EndObject(); - BatchRequest.AddObjectId("ValueId"sv, Request.PayloadId); + BatchRequest.AddObjectId("ValueId"sv, Request.ValueId); BatchRequest << "ChunkId"sv << Request.ChunkId; BatchRequest << "RawOffset"sv << Request.RawOffset; BatchRequest << "RawSize"sv << Request.RawSize; @@ -784,7 +785,7 @@ namespace detail { } } - OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = std::move(Payload)}); + OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Value = std::move(Payload)}); } return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; @@ -793,7 +794,7 @@ namespace detail { for (size_t Index : RequestIndex) { - OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = IoBuffer()}); + OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Value = IoBuffer()}); } return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; @@ -801,11 +802,11 @@ namespace detail { virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, IoBuffer RecordValue, - std::span<IoBuffer const> Payloads) override + std::span<IoBuffer const> Values) override { ZEN_TRACE_CPU("Upstream::Zen::PutCacheRecord"); - ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size()); + ZEN_ASSERT(CacheRecord.ValueContentIds.size() == Values.size()); const int32_t MaxAttempts = 3; try @@ -820,9 +821,9 @@ namespace detail { CbPackage Package; Package.SetObject(CbObject(SharedBuffer(RecordValue))); - for (const IoBuffer& Payload : Payloads) + for (const IoBuffer& Value : Values) { - if (CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(Payload))) + if (CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(Value))) { Package.AddAttachment(CbAttachment(AttachmentBuffer)); } @@ -848,15 +849,15 @@ namespace detail { } else { - for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++) + for (size_t Idx = 0, Count = Values.size(); Idx < Count; Idx++) { Result.Success = false; for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { - Result = Session.PutCachePayload(CacheRecord.Key.Bucket, - CacheRecord.Key.Hash, - CacheRecord.PayloadIds[Idx], - Payloads[Idx]); + Result = Session.PutCacheValue(CacheRecord.Key.Bucket, + CacheRecord.Key.Hash, + CacheRecord.ValueContentIds[Idx], + Values[Idx]); } m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -866,7 +867,7 @@ namespace detail { if (!Result.Success) { - return {.Reason = "Failed to upload payload", + return {.Reason = "Failed to upload value", .Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = false}; @@ -1046,7 +1047,7 @@ public: virtual void GetCacheRecords(std::span<CacheKey> CacheKeys, std::span<size_t> KeyIndex, - const CacheRecordPolicy& Policy, + const CacheRecordPolicy& DownstreamPolicy, OnCacheRecordGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::GetCacheRecords"); @@ -1057,6 +1058,8 @@ public: if (m_Options.ReadUpstream) { + CacheRecordPolicy UpstreamPolicy = DownstreamPolicy.ConvertToUpstream(); + for (auto& Endpoint : m_Endpoints) { if (RemainingKeys.empty()) @@ -1075,18 +1078,19 @@ public: { metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); - Result = Endpoint->GetCacheRecords(CacheKeys, RemainingKeys, Policy, [&](CacheRecordGetCompleteParams&& Params) { - if (Params.Record) - { - OnComplete(std::forward<CacheRecordGetCompleteParams>(Params)); + Result = + Endpoint->GetCacheRecords(CacheKeys, RemainingKeys, UpstreamPolicy, [&](CacheRecordGetCompleteParams&& Params) { + if (Params.Record) + { + OnComplete(std::forward<CacheRecordGetCompleteParams>(Params)); - Stats.CacheHitCount.Increment(1); - } - else - { - Missing.push_back(Params.KeyIndex); - } - }); + Stats.CacheHitCount.Increment(1); + } + else + { + Missing.push_back(Params.KeyIndex); + } + }); } Stats.CacheGetCount.Increment(int64_t(RemainingKeys.size())); @@ -1112,11 +1116,11 @@ public: } } - virtual void GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests, - std::span<size_t> RequestIndex, - OnCachePayloadGetComplete&& OnComplete) override final + virtual void GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests, + std::span<size_t> RequestIndex, + OnCacheValueGetComplete&& OnComplete) override final { - ZEN_TRACE_CPU("Upstream::GetCachePayloads"); + ZEN_TRACE_CPU("Upstream::GetCacheValues"); std::shared_lock<std::shared_mutex> _(m_EndpointsMutex); @@ -1142,10 +1146,10 @@ public: { metrics::OperationTiming::Scope Scope(Endpoint->Stats().CacheGetRequestTiming); - Result = Endpoint->GetCachePayloads(CacheChunkRequests, RemainingKeys, [&](CachePayloadGetCompleteParams&& Params) { - if (Params.Payload) + Result = Endpoint->GetCacheValues(CacheChunkRequests, RemainingKeys, [&](CacheValueGetCompleteParams&& Params) { + if (Params.Value) { - OnComplete(std::forward<CachePayloadGetCompleteParams>(Params)); + OnComplete(std::forward<CacheValueGetCompleteParams>(Params)); Stats.CacheHitCount.Increment(1); } @@ -1163,7 +1167,7 @@ public: { Stats.CacheErrorCount.Increment(1); - ZEN_ERROR("get cache payloads(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'", + ZEN_ERROR("get cache values(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'", Endpoint->GetEndpointInfo().Url, Result.Error.Reason, Result.Error.ErrorCode); @@ -1175,13 +1179,13 @@ public: for (size_t Index : RemainingKeys) { - OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = IoBuffer()}); + OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Value = IoBuffer()}); } } - virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) override + virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) override { - ZEN_TRACE_CPU("Upstream::GetCachePayload"); + ZEN_TRACE_CPU("Upstream::GetCacheValue"); if (m_Options.ReadUpstream) { @@ -1197,7 +1201,7 @@ public: { metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); - Result = Endpoint->GetCachePayload(CacheKey, PayloadId); + Result = Endpoint->GetCacheValue(CacheKey, ValueContentId); } Stats.CacheGetCount.Increment(1); @@ -1214,7 +1218,7 @@ public: { Stats.CacheErrorCount.Increment(1); - ZEN_ERROR("get cache payload FAILED, endpoint '{}', reason '{}', error code '{}'", + ZEN_ERROR("get cache value FAILED, endpoint '{}', reason '{}', error code '{}'", Endpoint->GetEndpointInfo().Url, Result.Error.Reason, Result.Error.ErrorCode); @@ -1299,18 +1303,18 @@ private: return; } - for (const IoHash& PayloadId : CacheRecord.PayloadIds) + for (const IoHash& ValueContentId : CacheRecord.ValueContentIds) { - if (IoBuffer Payload = m_CidStore.FindChunkByCid(PayloadId)) + if (IoBuffer Payload = m_CidStore.FindChunkByCid(ValueContentId)) { Payloads.push_back(Payload); } else { - ZEN_WARN("process upstream FAILED, '{}/{}/{}', payload doesn't exist in CAS", + ZEN_WARN("process upstream FAILED, '{}/{}/{}', ValueContentId doesn't exist in CAS", CacheRecord.Key.Bucket, CacheRecord.Key.Hash, - PayloadId); + ValueContentId); return; } } diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h index 16d8c7929..2087b1fba 100644 --- a/zenserver/upstream/upstreamcache.h +++ b/zenserver/upstream/upstreamcache.h @@ -27,7 +27,7 @@ struct UpstreamCacheRecord { ZenContentType Type = ZenContentType::kBinary; CacheKey Key; - std::vector<IoHash> PayloadIds; + std::vector<IoHash> ValueContentIds; }; struct UpstreamCacheOptions @@ -73,14 +73,14 @@ struct CacheRecordGetCompleteParams using OnCacheRecordGetComplete = std::function<void(CacheRecordGetCompleteParams&&)>; -struct CachePayloadGetCompleteParams +struct CacheValueGetCompleteParams { const CacheChunkRequest& Request; size_t RequestIndex{~size_t(0)}; - IoBuffer Payload; + IoBuffer Value; }; -using OnCachePayloadGetComplete = std::function<void(CachePayloadGetCompleteParams&&)>; +using OnCacheValueGetComplete = std::function<void(CacheValueGetCompleteParams&&)>; struct UpstreamEndpointStats { @@ -156,11 +156,11 @@ public: const CacheRecordPolicy& Policy, OnCacheRecordGetComplete&& OnComplete) = 0; - virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) = 0; + virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& PayloadId) = 0; - virtual GetUpstreamCacheResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests, - std::span<size_t> RequestIndex, - OnCachePayloadGetComplete&& OnComplete) = 0; + virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests, + std::span<size_t> RequestIndex, + OnCacheValueGetComplete&& OnComplete) = 0; virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, IoBuffer RecordValue, @@ -190,11 +190,11 @@ public: const CacheRecordPolicy& RecordPolicy, OnCacheRecordGetComplete&& OnComplete) = 0; - virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) = 0; + virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) = 0; - virtual void GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests, - std::span<size_t> RequestIndex, - OnCachePayloadGetComplete&& OnComplete) = 0; + virtual void GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests, + std::span<size_t> RequestIndex, + OnCacheValueGetComplete&& OnComplete) = 0; virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0; diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp index cd7f48334..a2666ac02 100644 --- a/zenserver/upstream/zen.cpp +++ b/zenserver/upstream/zen.cpp @@ -433,10 +433,10 @@ ZenStructuredCacheSession::GetCacheRecord(std::string_view BucketId, const IoHas } ZenCacheResult -ZenStructuredCacheSession::GetCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId) +ZenStructuredCacheSession::GetCacheValue(std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId) { ExtendableStringBuilder<256> Uri; - Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << PayloadId.ToHexString(); + Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); @@ -486,10 +486,10 @@ ZenStructuredCacheSession::PutCacheRecord(std::string_view BucketId, const IoHas } ZenCacheResult -ZenStructuredCacheSession::PutCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId, IoBuffer Payload) +ZenStructuredCacheSession::PutCacheValue(std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId, IoBuffer Payload) { ExtendableStringBuilder<256> Uri; - Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << PayloadId.ToHexString(); + Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h index c2be2165a..8cc4c121d 100644 --- a/zenserver/upstream/zen.h +++ b/zenserver/upstream/zen.h @@ -123,9 +123,9 @@ public: ZenCacheResult CheckHealth(); ZenCacheResult GetCacheRecord(std::string_view BucketId, const IoHash& Key, ZenContentType Type); - ZenCacheResult GetCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId); + ZenCacheResult GetCacheValue(std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId); ZenCacheResult PutCacheRecord(std::string_view BucketId, const IoHash& Key, IoBuffer Value, ZenContentType Type); - ZenCacheResult PutCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId, IoBuffer Payload); + ZenCacheResult PutCacheValue(std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId, IoBuffer Payload); ZenCacheResult InvokeRpc(const CbObjectView& Request); private: diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index feb73c5f7..e246afc34 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -155,7 +155,7 @@ namespace utils { class ZenServer : public IHttpStatusProvider { public: - void Initialize(const ZenServerOptions& ServerOptions, ZenServerState::ZenServerEntry* ServerEntry) + int Initialize(const ZenServerOptions& ServerOptions, ZenServerState::ZenServerEntry* ServerEntry) { m_UseSentry = ServerOptions.NoSentry == false; m_ServerEntry = ServerEntry; @@ -200,8 +200,8 @@ public: // Ok so now we're configured, let's kick things off - m_Http = zen::CreateHttpServer(ServerOptions.HttpServerClass); - m_Http->Initialize(ServerOptions.BasePort); + m_Http = zen::CreateHttpServer(ServerOptions.HttpServerClass); + int EffectiveBasePort = m_Http->Initialize(ServerOptions.BasePort); m_AuthService = std::make_unique<zen::HttpAuthService>(); m_Http->RegisterService(*m_AuthService); @@ -262,7 +262,7 @@ public: #if ZEN_ENABLE_MESH if (ServerOptions.MeshEnabled) { - StartMesh(BasePort); + StartMesh(EffectiveBasePort); } else { @@ -309,6 +309,8 @@ public: .Enabled = ServerOptions.GcConfig.Enabled, }; m_GcScheduler.Initialize(GcConfig); + + return EffectiveBasePort; } void InitializeState(const ZenServerOptions& ServerOptions); @@ -889,6 +891,20 @@ ZenEntryPoint::Run() Entry->AddSponsorProcess(ServerOptions.OwnerPid); } + ZenServer Server; + Server.SetDataRoot(ServerOptions.DataDir); + Server.SetContentRoot(ServerOptions.ContentDir); + Server.SetTestMode(ServerOptions.IsTest); + Server.SetDedicatedMode(ServerOptions.IsDedicated); + int EffectiveBasePort = Server.Initialize(ServerOptions, Entry); + + Entry->EffectiveListenPort = uint16_t(EffectiveBasePort); + if (EffectiveBasePort != ServerOptions.BasePort) + { + ZEN_INFO(ZEN_APP_NAME " - relocated to base port {}", EffectiveBasePort); + ServerOptions.BasePort = EffectiveBasePort; + } + std::unique_ptr<std::thread> ShutdownThread; std::unique_ptr<zen::NamedEvent> ShutdownEvent; @@ -896,13 +912,6 @@ ZenEntryPoint::Run() ShutdownEventName << "Zen_" << ServerOptions.BasePort << "_Shutdown"; ShutdownEvent.reset(new zen::NamedEvent{ShutdownEventName}); - ZenServer Server; - Server.SetDataRoot(ServerOptions.DataDir); - Server.SetContentRoot(ServerOptions.ContentDir); - Server.SetTestMode(ServerOptions.IsTest); - Server.SetDedicatedMode(ServerOptions.IsDedicated); - Server.Initialize(ServerOptions, Entry); - // Monitor shutdown signals ShutdownThread.reset(new std::thread{[&] { |