diff options
| author | Dan Engelbrecht <[email protected]> | 2022-05-19 22:34:00 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-05-19 22:34:00 +0200 |
| commit | 421701a530f7f69d0148e0f9ccd45b31b0801d6d (patch) | |
| tree | f85199c897ad5942d25a6a4edb88c9b238797be2 | |
| parent | Merge pull request #100 from EpicGames/de/fix-retry-logic-for-standalone-cach... (diff) | |
| parent | fix tests (diff) | |
| download | zen-421701a530f7f69d0148e0f9ccd45b31b0801d6d.tar.xz zen-421701a530f7f69d0148e0f9ccd45b31b0801d6d.zip | |
Merge pull request #99 from EpicGames/de/move-namespace-field
Keep Namespace out of CacheKey and store it on request level
| -rw-r--r-- | zenserver-test/zenserver-test.cpp | 49 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 231 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.h | 17 | ||||
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 24 | ||||
| -rw-r--r-- | zenserver/cache/structuredcachestore.h | 3 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 103 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.h | 26 | ||||
| -rw-r--r-- | zenserver/upstream/zen.cpp | 47 | ||||
| -rw-r--r-- | zenserver/upstream/zen.h | 16 | ||||
| -rw-r--r-- | zenutil/include/zenutil/cache/cachekey.h | 10 |
10 files changed, 327 insertions, 199 deletions
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp index 0f4858bd5..a6e89e702 100644 --- a/zenserver-test/zenserver-test.cpp +++ b/zenserver-test/zenserver-test.cpp @@ -1462,7 +1462,7 @@ TEST_CASE("zcache.rpc") { zen::IoHash KeyHash; ((uint32_t*)(KeyHash.Hash))[0] = Key; - const zen::CacheKey CacheKey = zen::CacheKey::Create(Namespace, Bucket, KeyHash); + const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash); CbPackage Package; CbWriter Writer; @@ -1474,6 +1474,7 @@ TEST_CASE("zcache.rpc") { CachePolicy BatchDefaultPolicy = CachePolicy::Default; Writer << "DefaultPolicy"sv << WriteToString<128>(BatchDefaultPolicy); + Writer << "Namespace"sv << Namespace; Writer.BeginArray("Requests"sv); { AppendCacheRecord(Package, Writer, CacheKey, PayloadSize, BatchDefaultPolicy, CachePolicy::Default); @@ -1505,7 +1506,10 @@ TEST_CASE("zcache.rpc") bool Success; }; - auto GetCacheRecords = [](std::string_view BaseUri, std::span<zen::CacheKey> Keys, zen::CachePolicy Policy) -> GetCacheRecordResult { + auto GetCacheRecords = [](std::string_view BaseUri, + std::string_view Namespace, + std::span<zen::CacheKey> Keys, + zen::CachePolicy Policy) -> GetCacheRecordResult { using namespace zen; CbObjectWriter Request; @@ -1514,6 +1518,8 @@ TEST_CASE("zcache.rpc") Request.BeginObject("Params"sv); { Request << "DefaultPolicy"sv << WriteToString<128>(Policy); + Request << "Namespace"sv << Namespace; + Request.BeginArray("Requests"sv); for (const CacheKey& Key : Keys) { @@ -1563,9 +1569,7 @@ TEST_CASE("zcache.rpc") auto LoadKey = [](zen::CbFieldView KeyView) -> zen::CacheKey { if (zen::CbObjectView KeyObj = KeyView.AsObjectView()) { - return CacheKey::Create(KeyObj["Namespace"sv] ? KeyObj["Namespace"sv].AsString() : ""sv, - KeyObj["Bucket"sv].AsString(), - KeyObj["Hash"].AsHash()); + return CacheKey::Create(KeyObj["Bucket"sv].AsString(), KeyObj["Hash"].AsHash()); } return CacheKey::Empty; }; @@ -1582,8 +1586,8 @@ TEST_CASE("zcache.rpc") Inst.WaitUntilReady(); CachePolicy Policy = CachePolicy::Default; - std::vector<zen::CacheKey> Keys = PutCacheRecords(BaseUri, ""sv, "mastodon"sv, 128); - GetCacheRecordResult Result = GetCacheRecords(BaseUri, Keys, Policy); + std::vector<zen::CacheKey> Keys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 128); + GetCacheRecordResult Result = GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, Policy); CHECK(Result.Records.size() == Keys.size()); @@ -1593,11 +1597,9 @@ TEST_CASE("zcache.rpc") CbObjectView RecordObj = RecordView.AsObjectView(); CbObjectView KeyObj = RecordObj["Key"sv].AsObjectView(); - const CacheKey Key = CacheKey::Create(KeyObj["Namespace"sv] ? KeyObj["Namespace"sv].AsString() : ""sv, - KeyObj["Bucket"sv].AsString(), - KeyObj["Hash"].AsHash()); - IoHash AttachmentHash; - size_t NumValues = 0; + const CacheKey Key = CacheKey::Create(KeyObj["Bucket"sv].AsString(), KeyObj["Hash"].AsHash()); + IoHash AttachmentHash; + size_t NumValues = 0; for (CbFieldView Value : RecordObj["Values"sv]) { AttachmentHash = Value.AsObjectView()["RawHash"sv].AsHash(); @@ -1623,16 +1625,16 @@ TEST_CASE("zcache.rpc") Inst.WaitUntilReady(); CachePolicy Policy = CachePolicy::Default; - std::vector<zen::CacheKey> ExistingKeys = PutCacheRecords(BaseUri, ""sv, "mastodon"sv, 128); + std::vector<zen::CacheKey> ExistingKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 128); std::vector<zen::CacheKey> Keys; for (const zen::CacheKey& Key : ExistingKeys) { Keys.push_back(Key); - Keys.push_back(CacheKey::Create("missing"sv, "missing"sv, IoHash::Zero)); + Keys.push_back(CacheKey::Create("missing"sv, IoHash::Zero)); } - GetCacheRecordResult Result = GetCacheRecords(BaseUri, Keys, Policy); + GetCacheRecordResult Result = GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, Policy); CHECK(Result.Records.size() == Keys.size()); @@ -1677,10 +1679,10 @@ TEST_CASE("zcache.rpc") SpawnServer(UpstreamServer, UpstreamCfg); SpawnServer(LocalServer, LocalCfg); - std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, ""sv, "mastodon"sv, 4); + std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, "ue4.ddc"sv, "mastodon"sv, 4); CachePolicy Policy = CachePolicy::QueryLocal; - GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, Keys, Policy); + GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, "ue4.ddc"sv, Keys, Policy); CHECK(Result.Records.size() == Keys.size()); @@ -1702,10 +1704,10 @@ TEST_CASE("zcache.rpc") SpawnServer(UpstreamServer, UpstreamCfg); SpawnServer(LocalServer, LocalCfg); - std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, ""sv, "mastodon"sv, 4); + std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, "ue4.ddc"sv, "mastodon"sv, 4); CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); - GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, Keys, Policy); + GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, "ue4.ddc"sv, Keys, Policy); CHECK(Result.Records.size() == Keys.size()); @@ -1736,7 +1738,7 @@ TEST_CASE("zcache.rpc.allpolicies") std::string_view TestVersion = "F72150A02AE34B57A9EC91D36BA1CE08"sv; std::string_view TestBucket = "allpoliciestest"sv; - std::string_view TestNamespace = ""sv; + std::string_view TestNamespace = "ue4.ddc"sv; // NumKeys = (2 Value vs Record)*(2 SkipData vs Default)*(2 ForceMiss vs Not)*(2 use local) // *(2 use remote)*(2 UseValue Policy vs not)*(4 cases per type) @@ -1838,7 +1840,7 @@ TEST_CASE("zcache.rpc.allpolicies") IoHash KeyHash = KeyWriter.GetHash(); KeyData& KeyData = KeyDatas[KeyIndex]; - KeyData.Key = CacheKey::Create(TestNamespace, TestBucket, KeyHash); + KeyData.Key = CacheKey::Create(TestBucket, KeyHash); KeyData.KeyIndex = KeyIndex; KeyData.GetRequestsData = (KeyIndex & (1 << 1)) == 0; KeyData.UseValueAPI = (KeyIndex & (1 << 2)) != 0; @@ -1937,6 +1939,7 @@ TEST_CASE("zcache.rpc.allpolicies") { CachePolicy BatchDefaultPolicy = CachePolicy::Default; Writer << "DefaultPolicy"sv << WriteToString<128>(BatchDefaultPolicy); + Writer << "Namespace"sv << TestNamespace; Writer.BeginArray("Requests"sv); for (CachePutRequest& Request : PutRequests) { @@ -1992,6 +1995,7 @@ TEST_CASE("zcache.rpc.allpolicies") { CachePolicy BatchDefaultPolicy = CachePolicy::Default; Writer << "DefaultPolicy"sv << WriteToString<128>(BatchDefaultPolicy); + Writer << "Namespace"sv << TestNamespace; Writer.BeginArray("Requests"sv); for (CachePutValueRequest& Request : PutValueRequests) { @@ -2047,6 +2051,7 @@ TEST_CASE("zcache.rpc.allpolicies") { CachePolicy BatchDefaultPolicy = CachePolicy::Default; Writer << "DefaultPolicy"sv << WriteToString<128>(BatchDefaultPolicy); + Writer << "Namespace"sv << TestNamespace; Writer.BeginArray("Requests"sv); for (CacheGetRequest& Request : GetRequests) { @@ -2158,6 +2163,7 @@ TEST_CASE("zcache.rpc.allpolicies") { CachePolicy BatchDefaultPolicy = CachePolicy::Default; Writer << "DefaultPolicy"sv << WriteToString<128>(BatchDefaultPolicy); + Writer << "Namespace"sv << TestNamespace; Writer.BeginArray("Requests"sv); for (CacheGetValueRequest& Request : GetValueRequests) { @@ -2252,6 +2258,7 @@ TEST_CASE("zcache.rpc.allpolicies") { CachePolicy BatchDefaultPolicy = CachePolicy::Default; Writer << "DefaultPolicy"sv << WriteToString<128>(BatchDefaultPolicy); + Writer << "Namespace"sv << TestNamespace; Writer.BeginArray("ChunkRequests"sv); for (CacheGetChunkRequest& Request : ChunkRequests) { diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index bc6f31dd3..135572d07 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -70,6 +70,7 @@ struct AttachmentCount struct PutRequestData { + std::string Namespace; CacheKey Key; CbObjectView RecordObject; CacheRecordPolicy Policy; @@ -244,30 +245,27 @@ namespace { } } - bool GetRpcRequestCacheKey(const CbObjectView& KeyView, CacheKey& Key) + std::optional<std::string> GetRpcRequestNamespace(const CbObjectView Params) { - CbFieldView NamespaceField = KeyView["Namespace"sv]; - std::optional<std::string> Namespace; + CbFieldView NamespaceField = Params["Namespace"sv]; if (!NamespaceField) { - Namespace = ZenCacheStore::DefaultNamespace; + return std::string(ZenCacheStore::DefaultNamespace); } - else + + if (NamespaceField.HasError()) { - if (NamespaceField.HasError()) - { - return false; - } - if (!NamespaceField.IsString()) - { - return false; - } - Namespace = GetValidNamespaceName(NamespaceField.AsString()); + return {}; } - if (!Namespace.has_value()) + if (!NamespaceField.IsString()) { - return false; + return {}; } + return GetValidNamespaceName(NamespaceField.AsString()); + } + + bool GetRpcRequestCacheKey(const CbObjectView& KeyView, CacheKey& Key) + { CbFieldView BucketField = KeyView["Bucket"sv]; if (BucketField.HasError()) { @@ -292,7 +290,7 @@ namespace { return false; } IoHash Hash = HashField.AsHash(); - Key = CacheKey::Create(*Namespace, *Bucket, Hash); + Key = CacheKey::Create(*Bucket, Hash); return true; } @@ -596,7 +594,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request metrics::OperationTiming::Scope $(m_UpstreamGetRequestTiming); if (GetUpstreamCacheResult UpstreamResult = - m_UpstreamCache.GetCacheRecord({Ref.Namespace, Ref.BucketSegment, Ref.HashKey}, AcceptType); + m_UpstreamCache.GetCacheRecord(Ref.Namespace, {Ref.BucketSegment, Ref.HashKey}, AcceptType); UpstreamResult.Success) { Success = true; @@ -769,7 +767,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request if (EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreRemote)) { - m_UpstreamCache.EnqueueUpstream({.Type = ContentType, .Key = {Ref.Namespace, Ref.BucketSegment, Ref.HashKey}}); + m_UpstreamCache.EnqueueUpstream({.Type = ContentType, .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}}); } Request.WriteResponse(HttpResponseCode::Created); @@ -819,7 +817,8 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) { m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbObject, - .Key = {Ref.Namespace, Ref.BucketSegment, Ref.HashKey}, + .Namespace = Ref.Namespace, + .Key = {Ref.BucketSegment, Ref.HashKey}, .ValueContentIds = std::move(ValidAttachments)}); } @@ -904,7 +903,8 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) { m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage, - .Key = {Ref.Namespace, Ref.BucketSegment, Ref.HashKey}, + .Namespace = Ref.Namespace, + .Key = {Ref.BucketSegment, Ref.HashKey}, .ValueContentIds = std::move(ValidAttachments)}); } @@ -946,7 +946,7 @@ HttpStructuredCacheService::HandleGetCacheValue(zen::HttpServerRequest& Request, if (QueryUpstream) { - if (auto UpstreamResult = m_UpstreamCache.GetCacheValue({Ref.Namespace, Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId); + if (auto UpstreamResult = m_UpstreamCache.GetCacheValue(Ref.Namespace, {Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId); UpstreamResult.Success) { if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) @@ -1124,8 +1124,13 @@ HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Req ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheRecords"sv); - std::string_view PolicyText = Params["DefaultPolicy"].AsString(); - DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; + std::string_view PolicyText = Params["DefaultPolicy"].AsString(); + std::optional<std::string> Namespace = GetRpcRequestNamespace(Params); + if (!Namespace) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } + DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; std::vector<bool> Results; for (CbFieldView RequestField : Params["Requests"sv]) { @@ -1139,7 +1144,7 @@ HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Req return Request.WriteResponse(HttpResponseCode::BadRequest); } CacheRecordPolicy Policy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy); - PutRequestData PutRequest{std::move(Key), RecordObject, std::move(Policy)}; + PutRequestData PutRequest{*Namespace, std::move(Key), RecordObject, std::move(Policy)}; PutResult Result = PutCacheRecord(PutRequest, &BatchRequest); @@ -1203,7 +1208,7 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack else { ZEN_WARN("PUT - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed", - Request.Key.Namespace, + Request.Namespace, Request.Key.Bucket, Request.Key.Hash, ToString(HttpContentType::kCbPackage), @@ -1225,7 +1230,7 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack } ZEN_DEBUG("PUT - '{}/{}/{}' {}, attachments '{}/{}/{}' (new/valid/total)", - Request.Key.Namespace, + Request.Namespace, Request.Key.Bucket, Request.Key.Hash, NiceBytes(TransferredSize), @@ -1237,14 +1242,16 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack CacheValue.Value = IoBuffer(Record.GetSize()); Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize())); CacheValue.Value.SetContentType(ZenContentType::kCbObject); - m_CacheStore.Put(Request.Key.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue); + m_CacheStore.Put(Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue); const bool IsPartialRecord = Count.Valid != Count.Total; if (EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord) { - m_UpstreamCache.EnqueueUpstream( - {.Type = ZenContentType::kCbPackage, .Key = Request.Key, .ValueContentIds = std::move(ValidAttachments)}); + m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage, + .Namespace = Request.Namespace, + .Key = Request.Key, + .ValueContentIds = std::move(ValidAttachments)}); } return PutResult::Success; } @@ -1277,8 +1284,13 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt bool UsedUpstream = false; }; - std::string_view PolicyText = Params["DefaultPolicy"sv].AsString(); - CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; + std::string_view PolicyText = Params["DefaultPolicy"sv].AsString(); + CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; + std::optional<std::string> Namespace = GetRpcRequestNamespace(Params); + if (!Namespace) + { + return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); + } std::vector<RecordRequestData> Requests; std::vector<size_t> UpstreamIndexes; CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); @@ -1322,7 +1334,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt ZenCacheValue RecordCacheValue; if (EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryLocal) && - m_CacheStore.Get(Key.Namespace, Key.Bucket, Key.Hash, RecordCacheValue)) + m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, RecordCacheValue)) { Request.RecordCacheValue = std::move(RecordCacheValue.Value); if (Request.RecordCacheValue.GetContentType() != ZenContentType::kCbObject) @@ -1436,7 +1448,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt } } - const auto OnCacheRecordGetComplete = [this, &ParseValues](CacheRecordGetCompleteParams&& Params) { + const auto OnCacheRecordGetComplete = [this, Namespace, &ParseValues](CacheRecordGetCompleteParams&& Params) { if (!Params.Record) { return; @@ -1453,7 +1465,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt Request.RecordObject = ObjectBuffer; if (EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal)) { - m_CacheStore.Put(Key.Namespace, Key.Bucket, Key.Hash, {.Value = {Request.RecordCacheValue}}); + m_CacheStore.Put(*Namespace, Key.Bucket, Key.Hash, {.Value = {Request.RecordCacheValue}}); } ParseValues(Request); Request.UsedUpstream = true; @@ -1493,7 +1505,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt { ZEN_DEBUG("Uncompressed value '{}' from upstream cache record '{}/{}/{}'", Value.ContentId, - Key.Namespace, + *Namespace, Key.Bucket, Key.Hash); } @@ -1510,7 +1522,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt } }; - m_UpstreamCache.GetCacheRecords(UpstreamRequests, std::move(OnCacheRecordGetComplete)); + m_UpstreamCache.GetCacheRecords(*Namespace, UpstreamRequests, std::move(OnCacheRecordGetComplete)); } CbPackage ResponsePackage; @@ -1533,7 +1545,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt } ZEN_DEBUG("HIT - '{}/{}/{}' {}{}{}", - Key.Namespace, + *Namespace, Key.Bucket, Key.Hash, NiceBytes(Request.RecordCacheValue.Size()), @@ -1549,11 +1561,11 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt if (!EnumHasAnyFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::Query)) { // If they requested no query, do not record this as a miss - ZEN_DEBUG("DISABLEDQUERY - '{}/{}/{}'", Key.Namespace, Key.Bucket, Key.Hash); + ZEN_DEBUG("DISABLEDQUERY - '{}/{}/{}'", *Namespace, Key.Bucket, Key.Hash); } else { - ZEN_DEBUG("MISS - '{}/{}/{}' {}", Key.Namespace, Key.Bucket, Key.Hash, Request.RecordObject ? ""sv : "(PARTIAL)"sv); + ZEN_DEBUG("MISS - '{}/{}/{}' {}", *Namespace, Key.Bucket, Key.Hash, Request.RecordObject ? ""sv : "(PARTIAL)"sv); m_CacheStats.MissCount++; } } @@ -1579,8 +1591,13 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheValues"sv); - std::string_view PolicyText = Params["DefaultPolicy"].AsString(); - CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; + std::string_view PolicyText = Params["DefaultPolicy"].AsString(); + CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; + std::optional<std::string> Namespace = GetRpcRequestNamespace(Params); + if (!Namespace) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } std::vector<bool> Results; for (CbFieldView RequestField : Params["Requests"sv]) { @@ -1615,21 +1632,21 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ { IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer(); Value.SetContentType(ZenContentType::kCompressedBinary); - m_CacheStore.Put(Key.Namespace, Key.Bucket, Key.Hash, {.Value = Value}); + m_CacheStore.Put(*Namespace, Key.Bucket, Key.Hash, {.Value = Value}); TransferredSize = Chunk.GetCompressedSize(); } Succeeded = true; } else { - ZEN_WARN("PUTCACHEVALUES - '{}/{}/{}/{}' FAILED, value is not compressed", Key.Namespace, Key.Bucket, Key.Hash, RawHash); + ZEN_WARN("PUTCACHEVALUES - '{}/{}/{}/{}' FAILED, value is not compressed", *Namespace, Key.Bucket, Key.Hash, RawHash); return Request.WriteResponse(HttpResponseCode::BadRequest); } } else if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) { ZenCacheValue ExistingValue; - if (m_CacheStore.Get(Key.Namespace, Key.Bucket, Key.Hash, ExistingValue) && + if (m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, ExistingValue) && IsCompressedBinary(ExistingValue.Value.GetContentType())) { Succeeded = true; @@ -1640,11 +1657,11 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ if (Succeeded && EnumHasAllFlags(Policy, CachePolicy::StoreRemote)) { - m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCompressedBinary, .Key = Key}); + m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = Key}); } Results.push_back(Succeeded); ZEN_DEBUG("PUTCACHEVALUES - '{}/{}/{}' {}, '{}'", - Key.Namespace, + *Namespace, Key.Bucket, Key.Hash, NiceBytes(TransferredSize), @@ -1679,9 +1696,15 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http { ZEN_TRACE_CPU("Z$::RpcGetCacheValues"); - CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); - std::string_view PolicyText = Params["DefaultPolicy"sv].AsString(); - CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; + CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); + std::string_view PolicyText = Params["DefaultPolicy"sv].AsString(); + CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; + std::optional<std::string> Namespace = GetRpcRequestNamespace(Params); + if (!Namespace) + { + return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); + } + struct RequestData { CacheKey Key; @@ -1717,7 +1740,7 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http ZenCacheValue CacheValue; if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) { - if (m_CacheStore.Get(Key.Namespace, Key.Bucket, Key.Hash, CacheValue) && IsCompressedBinary(CacheValue.Value.GetContentType())) + if (m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, CacheValue) && IsCompressedBinary(CacheValue.Value.GetContentType())) { Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value)); } @@ -1725,7 +1748,7 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http if (Result) { ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}", - Key.Namespace, + *Namespace, Key.Bucket, Key.Hash, NiceBytes(Result.GetCompressed().GetSize()), @@ -1740,12 +1763,12 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http else if (!EnumHasAnyFlags(Policy, CachePolicy::Query)) { // If they requested no query, do not record this as a miss - ZEN_DEBUG("GETCACHEVALUES DISABLEDQUERY - '{}/{}/{}'", Key.Namespace, Key.Bucket, Key.Hash); + ZEN_DEBUG("GETCACHEVALUES DISABLEDQUERY - '{}/{}/{}'", *Namespace, Key.Bucket, Key.Hash); } else { ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}", - Key.Namespace, + *Namespace, Key.Bucket, Key.Hash, "LOCAL"sv, @@ -1763,13 +1786,14 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http for (size_t Index : RemoteRequestIndexes) { RequestData& Request = Requests[Index]; - RequestedRecordsData.push_back({{Request.Key.Namespace, Request.Key.Bucket, Request.Key.Hash}}); + RequestedRecordsData.push_back({Request.Key.Bucket, Request.Key.Hash}); CacheChunkRequests.push_back(&RequestedRecordsData.back()); } Stopwatch Timer; m_UpstreamCache.GetCacheValues( + *Namespace, CacheChunkRequests, - [this, &RequestedRecordsData, &Requests, &RemoteRequestIndexes, &Timer](CacheValueGetCompleteParams&& Params) { + [this, Namespace, &RequestedRecordsData, &Requests, &RemoteRequestIndexes, &Timer](CacheValueGetCompleteParams&& Params) { CacheChunkRequest& ChunkRequest = Params.Request; if (Params.Value) { @@ -1783,9 +1807,9 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http // that we copy data from upstream even when SkipData and !StoreLocal are true means that it is too expensive // for us to keep the data only on the upstream server. // if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal)) - m_CacheStore.Put(Request.Key.Namespace, Request.Key.Bucket, Request.Key.Hash, ZenCacheValue{Params.Value}); + m_CacheStore.Put(*Namespace, Request.Key.Bucket, Request.Key.Hash, ZenCacheValue{Params.Value}); ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}", - ChunkRequest.Key.Namespace, + *Namespace, ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, NiceBytes(Request.Result.GetCompressed().GetSize()), @@ -1797,7 +1821,7 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http } } ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}", - ChunkRequest.Key.Namespace, + *Namespace, ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, "UPSTREAM"sv, @@ -1888,6 +1912,7 @@ HttpStructuredCacheService::HandleRpcGetCacheChunks(zen::HttpServerRequest& Http ZEN_TRACE_CPU("Z$::RpcGetCacheChunks"); + std::string Namespace; std::vector<CacheKeyRequest> RecordKeys; // Data about a Record necessary to identify it to the upstream std::vector<RecordBody> Records; // Scratch-space data about a Record when fulfilling RecordRequests std::vector<CacheChunkRequest> RequestKeys; // Data about a ChunkRequest necessary to identify it to the upstream @@ -1897,27 +1922,28 @@ HttpStructuredCacheService::HandleRpcGetCacheChunks(zen::HttpServerRequest& Http std::vector<CacheChunkRequest*> UpstreamChunks; // ChunkRequests that we need to send to the upstream // Parse requests from the CompactBinary body of the RpcRequest and divide it into RecordRequests and ValueRequests - if (!ParseGetCacheChunksRequest(RecordKeys, Records, RequestKeys, Requests, RecordRequests, ValueRequests, RpcRequest)) + if (!ParseGetCacheChunksRequest(Namespace, RecordKeys, Records, RequestKeys, Requests, RecordRequests, ValueRequests, RpcRequest)) { return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); } // For each Record request, load the Record if necessary to find the Chunk's ContentId, load its Payloads if we // have it locally, and otherwise append a request for the payload to UpstreamChunks - GetLocalCacheRecords(RecordKeys, Records, RecordRequests, UpstreamChunks); + GetLocalCacheRecords(Namespace, RecordKeys, Records, RecordRequests, UpstreamChunks); // For each Value request, load the Value if we have it locally and otherwise append a request for the payload to UpstreamChunks - GetLocalCacheValues(ValueRequests, UpstreamChunks); + GetLocalCacheValues(Namespace, ValueRequests, UpstreamChunks); // Call GetCacheChunks on the upstream for any payloads we do not have locally - GetUpstreamCacheChunks(UpstreamChunks, RequestKeys, Requests); + GetUpstreamCacheChunks(Namespace, UpstreamChunks, RequestKeys, Requests); // Send the payload and descriptive data about each chunk to the client - WriteGetCacheChunksResponse(Requests, HttpRequest); + WriteGetCacheChunksResponse(Namespace, Requests, HttpRequest); } bool -HttpStructuredCacheService::ParseGetCacheChunksRequest(std::vector<CacheKeyRequest>& RecordKeys, +HttpStructuredCacheService::ParseGetCacheChunksRequest(std::string& Namespace, + std::vector<CacheKeyRequest>& RecordKeys, std::vector<cache::detail::RecordBody>& Records, std::vector<CacheChunkRequest>& RequestKeys, std::vector<cache::detail::ChunkRequest>& Requests, @@ -1929,11 +1955,20 @@ HttpStructuredCacheService::ParseGetCacheChunksRequest(std::vector<CacheKeyReque ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheChunks"sv); - CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); - std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString(); - CachePolicy DefaultPolicy = !DefaultPolicyText.empty() ? ParseCachePolicy(DefaultPolicyText) : CachePolicy::Default; - CbArrayView ChunkRequestsArray = Params["ChunkRequests"sv].AsArrayView(); - size_t NumRequests = static_cast<size_t>(ChunkRequestsArray.Num()); + CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); + std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString(); + CachePolicy DefaultPolicy = !DefaultPolicyText.empty() ? ParseCachePolicy(DefaultPolicyText) : CachePolicy::Default; + + std::optional<std::string> NamespaceText = GetRpcRequestNamespace(Params); + if (!NamespaceText) + { + ZEN_WARN("GetCacheChunks: Invalid namespace in ChunkRequest."); + return false; + } + Namespace = *NamespaceText; + + CbArrayView ChunkRequestsArray = Params["ChunkRequests"sv].AsArrayView(); + size_t NumRequests = static_cast<size_t>(ChunkRequestsArray.Num()); // Note that these reservations allow us to take pointers to the elements while populating them. If the reservation is removed, // we will need to change the pointers to indexes to handle reallocations. @@ -1996,11 +2031,9 @@ HttpStructuredCacheService::ParseGetCacheChunksRequest(std::vector<CacheKeyReque } else { - ZEN_WARN("GetCacheChunks: Keys in ChunkRequest are not sorted: {}/{}/{} came after {}/{}/{}.", - RequestKey.Key.Namespace, + ZEN_WARN("GetCacheChunks: Keys in ChunkRequest are not sorted: {}/{} came after {}/{}.", RequestKey.Key.Bucket, RequestKey.Key.Hash, - PreviousRecordKey->Key.Namespace, PreviousRecordKey->Key.Bucket, PreviousRecordKey->Key.Hash); return false; @@ -2022,7 +2055,8 @@ HttpStructuredCacheService::ParseGetCacheChunksRequest(std::vector<CacheKeyReque } void -HttpStructuredCacheService::GetLocalCacheRecords(std::vector<CacheKeyRequest>& RecordKeys, +HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespace, + std::vector<CacheKeyRequest>& RecordKeys, std::vector<cache::detail::RecordBody>& Records, std::vector<cache::detail::ChunkRequest*>& RecordRequests, std::vector<CacheChunkRequest*>& OutUpstreamChunks) @@ -2041,7 +2075,7 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::vector<CacheKeyRequest>& if (!Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryLocal)) { ZenCacheValue CacheValue; - if (m_CacheStore.Get(RecordKey.Key.Namespace, RecordKey.Key.Bucket, RecordKey.Key.Hash, CacheValue)) + if (m_CacheStore.Get(Namespace, RecordKey.Key.Bucket, RecordKey.Key.Hash, CacheValue)) { Record.Exists = true; Record.CacheValue = std::move(CacheValue.Value); @@ -2058,7 +2092,7 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::vector<CacheKeyRequest>& if (!UpstreamRecordRequests.empty()) { - const auto OnCacheRecordGetComplete = [this, &RecordKeys, &Records](CacheRecordGetCompleteParams&& Params) { + const auto OnCacheRecordGetComplete = [this, Namespace, &RecordKeys, &Records](CacheRecordGetCompleteParams&& Params) { if (!Params.Record) { return; @@ -2076,10 +2110,10 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::vector<CacheKeyRequest>& if (EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal)) { - m_CacheStore.Put(Key.Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue}); + m_CacheStore.Put(Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue}); } }; - m_UpstreamCache.GetCacheRecords(UpstreamRecordRequests, std::move(OnCacheRecordGetComplete)); + m_UpstreamCache.GetCacheRecords(Namespace, UpstreamRecordRequests, std::move(OnCacheRecordGetComplete)); } std::vector<CacheChunkRequest*> UpstreamPayloadRequests; @@ -2163,7 +2197,8 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::vector<CacheKeyRequest>& } void -HttpStructuredCacheService::GetLocalCacheValues(std::vector<cache::detail::ChunkRequest*>& ValueRequests, +HttpStructuredCacheService::GetLocalCacheValues(std::string_view Namespace, + std::vector<cache::detail::ChunkRequest*>& ValueRequests, std::vector<CacheChunkRequest*>& OutUpstreamChunks) { using namespace cache::detail; @@ -2173,7 +2208,7 @@ HttpStructuredCacheService::GetLocalCacheValues(std::vector<cache::detail::Chunk if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal)) { ZenCacheValue CacheValue; - if (m_CacheStore.Get(Request->Key->Key.Namespace, Request->Key->Key.Bucket, Request->Key->Key.Hash, CacheValue)) + if (m_CacheStore.Get(Namespace, Request->Key->Key.Bucket, Request->Key->Key.Hash, CacheValue)) { if (IsCompressedBinary(CacheValue.Value.GetContentType())) { @@ -2207,7 +2242,8 @@ HttpStructuredCacheService::GetLocalCacheValues(std::vector<cache::detail::Chunk } void -HttpStructuredCacheService::GetUpstreamCacheChunks(std::vector<CacheChunkRequest*>& UpstreamChunks, +HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Namespace, + std::vector<CacheChunkRequest*>& UpstreamChunks, std::vector<CacheChunkRequest>& RequestKeys, std::vector<cache::detail::ChunkRequest>& Requests) { @@ -2215,7 +2251,7 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::vector<CacheChunkRequest if (!UpstreamChunks.empty()) { - const auto OnCacheValueGetComplete = [this, &RequestKeys, &Requests](CacheValueGetCompleteParams&& Params) { + const auto OnCacheValueGetComplete = [this, Namespace, &RequestKeys, &Requests](CacheValueGetCompleteParams&& Params) { if (Params.RawHash == Params.RawHash.Zero) { return; @@ -2242,7 +2278,7 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::vector<CacheChunkRequest } else { - m_CacheStore.Put(Key.Key.Namespace, Key.Key.Bucket, Key.Key.Hash, {.Value = Params.Value}); + m_CacheStore.Put(Namespace, Key.Key.Bucket, Key.Key.Hash, {.Value = Params.Value}); } } if (!EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData)) @@ -2259,12 +2295,13 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::vector<CacheChunkRequest m_CacheStats.UpstreamHitCount++; }; - m_UpstreamCache.GetCacheValues(UpstreamChunks, std::move(OnCacheValueGetComplete)); + m_UpstreamCache.GetCacheValues(Namespace, UpstreamChunks, std::move(OnCacheValueGetComplete)); } } void -HttpStructuredCacheService::WriteGetCacheChunksResponse(std::vector<cache::detail::ChunkRequest>& Requests, +HttpStructuredCacheService::WriteGetCacheChunksResponse(std::string_view Namespace, + std::vector<cache::detail::ChunkRequest>& Requests, zen::HttpServerRequest& HttpRequest) { using namespace cache::detail; @@ -2290,7 +2327,7 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(std::vector<cache::detai } ZEN_DEBUG("HIT - '{}/{}/{}/{}' {} '{}' ({})", - Request.Key->Key.Namespace, + Namespace, Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId, @@ -2301,19 +2338,11 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(std::vector<cache::detai } else if (!EnumHasAnyFlags(Request.DownstreamPolicy, CachePolicy::Query)) { - ZEN_DEBUG("SKIP - '{}/{}/{}/{}'", - Request.Key->Key.Namespace, - Request.Key->Key.Bucket, - Request.Key->Key.Hash, - Request.Key->ValueId); + ZEN_DEBUG("SKIP - '{}/{}/{}/{}'", Namespace, Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId); } else { - ZEN_DEBUG("MISS - '{}/{}/{}/{}'", - Request.Key->Key.Namespace, - Request.Key->Key.Bucket, - Request.Key->Key.Hash, - Request.Key->ValueId); + ZEN_DEBUG("MISS - '{}/{}/{}/{}'", Namespace, Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId); m_CacheStats.MissCount++; } } @@ -2409,8 +2438,8 @@ TEST_CASE("z$service.parse.relative.Uri") CHECK(LegacyValueContentIdRequest.ValueContentId == IoHash::FromHexString("56789abcdef12345678956789abcdef123456789"sv)); HttpRequestData V2DefaultNamespaceRequest; - CHECK(HttpRequestParseRelativeUri("default", V2DefaultNamespaceRequest)); - CHECK(V2DefaultNamespaceRequest.Namespace == ZenCacheStore::DefaultNamespace); + CHECK(HttpRequestParseRelativeUri("ue4.ddc", V2DefaultNamespaceRequest)); + CHECK(V2DefaultNamespaceRequest.Namespace == "ue4.ddc"); CHECK(!V2DefaultNamespaceRequest.Bucket.has_value()); CHECK(!V2DefaultNamespaceRequest.HashKey.has_value()); CHECK(!V2DefaultNamespaceRequest.ValueContentId.has_value()); @@ -2423,8 +2452,8 @@ TEST_CASE("z$service.parse.relative.Uri") CHECK(!V2NamespaceRequest.ValueContentId.has_value()); HttpRequestData V2BucketRequestWithDefaultNamespace; - CHECK(HttpRequestParseRelativeUri("default/test", V2BucketRequestWithDefaultNamespace)); - CHECK(V2BucketRequestWithDefaultNamespace.Namespace == ZenCacheStore::DefaultNamespace); + CHECK(HttpRequestParseRelativeUri("ue4.ddc/test", V2BucketRequestWithDefaultNamespace)); + CHECK(V2BucketRequestWithDefaultNamespace.Namespace == "ue4.ddc"); CHECK(V2BucketRequestWithDefaultNamespace.Bucket == "test"sv); CHECK(!V2BucketRequestWithDefaultNamespace.HashKey.has_value()); CHECK(!V2BucketRequestWithDefaultNamespace.ValueContentId.has_value()); @@ -2437,7 +2466,7 @@ TEST_CASE("z$service.parse.relative.Uri") CHECK(!V2BucketRequestWithNamespace.ValueContentId.has_value()); HttpRequestData V2HashKeyRequest; - CHECK(HttpRequestParseRelativeUri("default/test/0123456789abcdef12340123456789abcdef1234", V2HashKeyRequest)); + CHECK(HttpRequestParseRelativeUri("test/0123456789abcdef12340123456789abcdef1234", V2HashKeyRequest)); CHECK(V2HashKeyRequest.Namespace == ZenCacheStore::DefaultNamespace); CHECK(V2HashKeyRequest.Bucket == "test"); CHECK(V2HashKeyRequest.HashKey == IoHash::FromHexString("0123456789abcdef12340123456789abcdef1234"sv)); diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index 5f248edd1..890a2ebab 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -118,7 +118,8 @@ private: PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package); /** HandleRpcGetCacheChunks Helper: Parse the Body object into RecordValue Requests and Value Requests. */ - bool ParseGetCacheChunksRequest(std::vector<CacheKeyRequest>& RecordKeys, + bool ParseGetCacheChunksRequest(std::string& Namespace, + std::vector<CacheKeyRequest>& RecordKeys, std::vector<cache::detail::RecordBody>& Records, std::vector<CacheChunkRequest>& RequestKeys, std::vector<cache::detail::ChunkRequest>& Requests, @@ -126,18 +127,24 @@ private: std::vector<cache::detail::ChunkRequest*>& ValueRequests, CbObjectView RpcRequest); /** HandleRpcGetCacheChunks Helper: Load records to get ContentId for RecordRequests, and load their payloads if they exist locally. */ - void GetLocalCacheRecords(std::vector<CacheKeyRequest>& RecordKeys, + void GetLocalCacheRecords(std::string_view Namespace, + std::vector<CacheKeyRequest>& RecordKeys, std::vector<cache::detail::RecordBody>& Records, std::vector<cache::detail::ChunkRequest*>& RecordRequests, std::vector<CacheChunkRequest*>& OutUpstreamChunks); /** HandleRpcGetCacheChunks Helper: For ValueRequests, load their payloads if they exist locally. */ - void GetLocalCacheValues(std::vector<cache::detail::ChunkRequest*>& ValueRequests, std::vector<CacheChunkRequest*>& OutUpstreamChunks); + void GetLocalCacheValues(std::string_view Namespace, + std::vector<cache::detail::ChunkRequest*>& ValueRequests, + std::vector<CacheChunkRequest*>& OutUpstreamChunks); /** HandleRpcGetCacheChunks Helper: Load payloads from upstream that did not exist locally. */ - void GetUpstreamCacheChunks(std::vector<CacheChunkRequest*>& UpstreamChunks, + void GetUpstreamCacheChunks(std::string_view Namespace, + std::vector<CacheChunkRequest*>& UpstreamChunks, std::vector<CacheChunkRequest>& RequestKeys, std::vector<cache::detail::ChunkRequest>& Requests); /** HandleRpcGetCacheChunks Helper: Send response message containing all chunk results. */ - void WriteGetCacheChunksResponse(std::vector<cache::detail::ChunkRequest>& Requests, zen::HttpServerRequest& HttpRequest); + void WriteGetCacheChunksResponse(std::string_view Namespace, + std::vector<cache::detail::ChunkRequest>& Requests, + zen::HttpServerRequest& HttpRequest); spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 4edc13b4a..da948fd72 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -2128,6 +2128,8 @@ ZenCacheDiskLayer::TotalSize() const //////////////////////////// ZenCacheStore +static constexpr std::string_view UE4DDCNamespaceName = "ue4.ddc"; + ZenCacheStore::ZenCacheStore(CasGc& Gc, std::filesystem::path BasePath) : GcStorage(Gc), GcContributor(Gc) { CreateDirectories(BasePath); @@ -2150,11 +2152,13 @@ ZenCacheStore::ZenCacheStore(CasGc& Gc, std::filesystem::path BasePath) : GcStor ZEN_INFO("Found #{} namespaces in '{}' and #{} legacy buckets", Namespaces.size(), BasePath, LegacyBuckets.size()); - if (std::find(Namespaces.begin(), Namespaces.end(), DefaultNamespace) == Namespaces.end()) + if (std::find(Namespaces.begin(), Namespaces.end(), UE4DDCNamespaceName) == Namespaces.end()) { - ZEN_INFO("Moving #{} legacy buckets to anonymous namespace", LegacyBuckets.size()); + // default (unspecified) and ue4-ddc namespace points to the same namespace instance + + ZEN_INFO("Moving #{} legacy buckets to '{}' namespace", LegacyBuckets.size(), UE4DDCNamespaceName); - std::filesystem::path DefaultNamespaceFolder = BasePath / fmt::format("{}{}", NamespaceDiskPrefix, DefaultNamespace); + std::filesystem::path DefaultNamespaceFolder = BasePath / fmt::format("{}{}", NamespaceDiskPrefix, UE4DDCNamespaceName); CreateDirectories(DefaultNamespaceFolder); // Move any non-namespace folders into the default namespace folder @@ -2169,8 +2173,7 @@ ZenCacheStore::ZenCacheStore(CasGc& Gc, std::filesystem::path BasePath) : GcStor ZEN_ERROR("Unable to move '{}' to '{}', reason '{}'", LegacyFolder, NewPath, Ec.message()); } } - - Namespaces.push_back(std::string(DefaultNamespace)); + Namespaces.push_back(std::string(UE4DDCNamespaceName)); } for (const std::string& NamespaceName : Namespaces) @@ -2237,6 +2240,13 @@ ZenCacheStore::GetNamespace(std::string_view Namespace) { return It->second.get(); } + if (Namespace == DefaultNamespace) + { + if (auto It = m_Namespaces.find(std::string(UE4DDCNamespaceName)); It != m_Namespaces.end()) + { + return It->second.get(); + } + } return nullptr; } @@ -2249,6 +2259,10 @@ ZenCacheStore::IterateNamespaces(const std::function<void(std::string_view Names Namespaces.reserve(m_Namespaces.size()); for (const auto& Entry : m_Namespaces) { + if (Entry.first == DefaultNamespace) + { + continue; + } Namespaces.push_back({Entry.first, *Entry.second}); } } diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h index 232e8b9a8..34b8d18f4 100644 --- a/zenserver/cache/structuredcachestore.h +++ b/zenserver/cache/structuredcachestore.h @@ -356,7 +356,8 @@ private: class ZenCacheStore final : public GcStorage, public GcContributor { public: - static constexpr std::string_view DefaultNamespace = "default"; + static constexpr std::string_view DefaultNamespace = + "!default!"; // This is intentionally not a valid namespace name and will only be used for mapping when no namespace is given static constexpr std::string_view NamespaceDiskPrefix = "ns_"; ZenCacheStore(CasGc& Gc, std::filesystem::path BasePath); diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 52513abe9..98b4439c7 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -182,7 +182,7 @@ namespace detail { virtual UpstreamEndpointStatus GetStatus() override { return m_Status.EndpointStatus(); } - virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override + virtual GetUpstreamCacheResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) override { ZEN_TRACE_CPU("Upstream::Horde::GetSingleCacheRecord"); @@ -191,11 +191,11 @@ namespace detail { CloudCacheSession Session(m_Client); CloudCacheResult Result; - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheKey.Namespace); + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); if (m_UseLegacyDdc && Type == ZenContentType::kBinary) { - std::string_view DdcNamespace = GetActualDdcNamespace(Session, CacheKey.Namespace); + std::string_view DdcNamespace = GetActualDdcNamespace(Session, Namespace); Result = Session.GetDerivedData(DdcNamespace, CacheKey.Bucket, CacheKey.Hash); } else if (Type == ZenContentType::kCompressedBinary) @@ -299,7 +299,9 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKeyRequest*> Requests, OnCacheRecordGetComplete&& OnComplete) override + virtual GetUpstreamCacheResult GetCacheRecords(std::string_view Namespace, + std::span<CacheKeyRequest*> Requests, + OnCacheRecordGetComplete&& OnComplete) override { ZEN_TRACE_CPU("Upstream::Horde::GetCacheRecords"); @@ -314,7 +316,7 @@ namespace detail { if (!Result.Error) { - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheKey.Namespace); + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); CloudCacheResult RefResult = Session.GetRef(BlobStoreNamespace, CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject); AppendResult(RefResult, Result); @@ -351,14 +353,14 @@ namespace detail { return Result; } - virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) override + virtual GetUpstreamCacheResult GetCacheValue(std::string_view Namespace, const CacheKey&, const IoHash& ValueContentId) override { ZEN_TRACE_CPU("Upstream::Horde::GetSingleCacheValue"); try { CloudCacheSession Session(m_Client); - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheKey.Namespace); + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); const CloudCacheResult Result = Session.GetCompressedBlob(BlobStoreNamespace, ValueContentId); m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -383,7 +385,8 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest*> CacheChunkRequests, + virtual GetUpstreamCacheResult GetCacheValues(std::string_view Namespace, + std::span<CacheChunkRequest*> CacheChunkRequests, OnCacheValueGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::Horde::GetCacheValues"); @@ -399,7 +402,7 @@ namespace detail { CompressedBuffer Compressed; if (!Result.Error) { - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Request.Key.Namespace); + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); const CloudCacheResult BlobResult = Session.GetCompressedBlob(BlobStoreNamespace, Request.ChunkId); Payload = BlobResult.Response; @@ -446,7 +449,7 @@ namespace detail { CloudCacheResult Result; for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheRecord.Key.Namespace); + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheRecord.Namespace); if (m_UseLegacyDdc) { Result = Session.PutDerivedData(BlobStoreNamespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue); @@ -484,6 +487,7 @@ namespace detail { return PerformStructuredPut( Session, + CacheRecord.Namespace, CacheRecord.Key, ReferencingObject.Save().GetBuffer().AsIoBuffer(), MaxAttempts, @@ -503,6 +507,7 @@ namespace detail { { return PerformStructuredPut( Session, + CacheRecord.Namespace, CacheRecord.Key, RecordValue, MaxAttempts, @@ -548,6 +553,7 @@ namespace detail { PutUpstreamCacheResult PerformStructuredPut( CloudCacheSession& Session, + std::string_view Namespace, const CacheKey& Key, IoBuffer ObjectBuffer, const int32_t MaxAttempts, @@ -556,7 +562,7 @@ namespace detail { int64_t TotalBytes = 0ull; double TotalElapsedSeconds = 0.0; - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Key.Namespace); + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); const auto PutBlobs = [&](std::span<IoHash> ValueContentIds, std::string& OutReason) -> bool { for (const IoHash& ValueContentId : ValueContentIds) { @@ -738,14 +744,14 @@ namespace detail { virtual UpstreamEndpointStatus GetStatus() override { return m_Status.EndpointStatus(); } - virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override + virtual GetUpstreamCacheResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) override { ZEN_TRACE_CPU("Upstream::Zen::GetSingleCacheRecord"); try { ZenStructuredCacheSession Session(*m_Client); - const ZenCacheResult Result = Session.GetCacheRecord(CacheKey.Bucket, CacheKey.Hash, Type); + const ZenCacheResult Result = Session.GetCacheRecord(Namespace, CacheKey.Bucket, CacheKey.Hash, Type); m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -769,20 +775,24 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKeyRequest*> Requests, OnCacheRecordGetComplete&& OnComplete) override + virtual GetUpstreamCacheResult GetCacheRecords(std::string_view Namespace, + std::span<CacheKeyRequest*> Requests, + OnCacheRecordGetComplete&& OnComplete) override { ZEN_TRACE_CPU("Upstream::Zen::GetCacheRecords"); ZEN_ASSERT(Requests.size() > 0); CbObjectWriter BatchRequest; BatchRequest << "Method"sv - << "GetCacheRecords"; + << "GetCacheRecords"sv; BatchRequest.BeginObject("Params"sv); { CachePolicy DefaultPolicy = Requests[0]->Policy.GetRecordPolicy(); BatchRequest << "DefaultPolicy"sv << WriteToString<128>(DefaultPolicy); + BatchRequest << "Namespace"sv << Namespace; + BatchRequest.BeginArray("Requests"sv); for (CacheKeyRequest* Request : Requests) { @@ -791,7 +801,6 @@ namespace detail { const CacheKey& Key = Request->Key; BatchRequest.BeginObject("Key"sv); { - BatchRequest << "Namespace"sv << Key.Namespace; BatchRequest << "Bucket"sv << Key.Bucket; BatchRequest << "Hash"sv << Key.Hash; } @@ -848,14 +857,16 @@ namespace detail { return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; } - virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) override + virtual GetUpstreamCacheResult GetCacheValue(std::string_view Namespace, + const CacheKey& CacheKey, + const IoHash& ValueContentId) override { ZEN_TRACE_CPU("Upstream::Zen::GetSingleCacheValue"); try { ZenStructuredCacheSession Session(*m_Client); - const ZenCacheResult Result = Session.GetCacheValue(CacheKey.Bucket, CacheKey.Hash, ValueContentId); + const ZenCacheResult Result = Session.GetCacheValue(Namespace, CacheKey.Bucket, CacheKey.Hash, ValueContentId); m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -879,7 +890,8 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest*> CacheChunkRequests, + virtual GetUpstreamCacheResult GetCacheValues(std::string_view Namespace, + std::span<CacheChunkRequest*> CacheChunkRequests, OnCacheValueGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::Zen::GetCacheValues"); @@ -887,12 +899,16 @@ namespace detail { CbObjectWriter BatchRequest; BatchRequest << "Method"sv - << "GetCacheChunks"; + << "GetCacheChunks"sv; + BatchRequest << "Namespace"sv << Namespace; BatchRequest.BeginObject("Params"sv); { CachePolicy DefaultPolicy = CacheChunkRequests[0]->Policy; BatchRequest << "DefaultPolicy"sv << WriteToString<128>(DefaultPolicy).ToView(); + + BatchRequest << "Namespace"sv << Namespace; + BatchRequest.BeginArray("ChunkRequests"sv); { for (CacheChunkRequest* RequestPtr : CacheChunkRequests) @@ -902,7 +918,6 @@ namespace detail { BatchRequest.BeginObject(); { BatchRequest.BeginObject("Key"sv); - BatchRequest << "Namespace"sv << Request.Key.Namespace; BatchRequest << "Bucket"sv << Request.Key.Bucket; BatchRequest << "Hash"sv << Request.Key.Hash; BatchRequest.EndObject(); @@ -1042,7 +1057,11 @@ namespace detail { for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { - Result = Session.PutCacheRecord(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, PackagePayload, CacheRecord.Type); + Result = Session.PutCacheRecord(CacheRecord.Namespace, + CacheRecord.Key.Bucket, + CacheRecord.Key.Hash, + PackagePayload, + CacheRecord.Type); } m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -1061,12 +1080,14 @@ namespace detail { CbPackage BatchPackage; CbObjectWriter BatchWriter; BatchWriter << "Method"sv - << "PutCacheValues"; + << "PutCacheValues"sv; BatchWriter.BeginObject("Params"sv); { // DefaultPolicy unspecified and expected to be Default + BatchWriter << "Namespace"sv << CacheRecord.Namespace; + BatchWriter.BeginArray("Requests"sv); { BatchWriter.BeginObject(); @@ -1074,7 +1095,6 @@ namespace detail { const CacheKey& Key = CacheRecord.Key; BatchWriter.BeginObject("Key"sv); { - BatchWriter << "Namespace"sv << Key.Namespace; BatchWriter << "Bucket"sv << Key.Bucket; BatchWriter << "Hash"sv << Key.Hash; } @@ -1108,7 +1128,8 @@ namespace detail { Result.Success = false; for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { - Result = Session.PutCacheValue(CacheRecord.Key.Bucket, + Result = Session.PutCacheValue(CacheRecord.Namespace, + CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheRecord.ValueContentIds[Idx], Values[Idx]); @@ -1131,7 +1152,11 @@ namespace detail { Result.Success = false; for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { - Result = Session.PutCacheRecord(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue, CacheRecord.Type); + Result = Session.PutCacheRecord(CacheRecord.Namespace, + CacheRecord.Key.Bucket, + CacheRecord.Key.Hash, + RecordValue, + CacheRecord.Type); } m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -1259,7 +1284,7 @@ public: } } - virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override + virtual GetUpstreamCacheResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) override { ZEN_TRACE_CPU("Upstream::GetCacheRecord"); @@ -1278,7 +1303,7 @@ public: GetUpstreamCacheResult Result; { metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); - Result = Endpoint->GetCacheRecord(CacheKey, Type); + Result = Endpoint->GetCacheRecord(Namespace, CacheKey, Type); } Stats.CacheGetCount.Increment(1); @@ -1306,7 +1331,9 @@ public: return {}; } - virtual void GetCacheRecords(std::span<CacheKeyRequest*> Requests, OnCacheRecordGetComplete&& OnComplete) override final + virtual void GetCacheRecords(std::string_view Namespace, + std::span<CacheKeyRequest*> Requests, + OnCacheRecordGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::GetCacheRecords"); @@ -1334,7 +1361,7 @@ public: { metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); - Result = Endpoint->GetCacheRecords(RemainingKeys, [&](CacheRecordGetCompleteParams&& Params) { + Result = Endpoint->GetCacheRecords(Namespace, RemainingKeys, [&](CacheRecordGetCompleteParams&& Params) { if (Params.Record) { OnComplete(std::forward<CacheRecordGetCompleteParams>(Params)); @@ -1371,7 +1398,9 @@ public: } } - virtual void GetCacheValues(std::span<CacheChunkRequest*> CacheChunkRequests, OnCacheValueGetComplete&& OnComplete) override final + virtual void GetCacheValues(std::string_view Namespace, + std::span<CacheChunkRequest*> CacheChunkRequests, + OnCacheValueGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::GetCacheValues"); @@ -1399,7 +1428,7 @@ public: { metrics::OperationTiming::Scope Scope(Endpoint->Stats().CacheGetRequestTiming); - Result = Endpoint->GetCacheValues(RemainingKeys, [&](CacheValueGetCompleteParams&& Params) { + Result = Endpoint->GetCacheValues(Namespace, RemainingKeys, [&](CacheValueGetCompleteParams&& Params) { if (Params.RawHash != Params.RawHash.Zero) { OnComplete(std::forward<CacheValueGetCompleteParams>(Params)); @@ -1436,7 +1465,9 @@ public: } } - virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) override + virtual GetUpstreamCacheResult GetCacheValue(std::string_view Namespace, + const CacheKey& CacheKey, + const IoHash& ValueContentId) override { ZEN_TRACE_CPU("Upstream::GetCacheValue"); @@ -1454,7 +1485,7 @@ public: { metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); - Result = Endpoint->GetCacheValue(CacheKey, ValueContentId); + Result = Endpoint->GetCacheValue(Namespace, CacheKey, ValueContentId); } Stats.CacheGetCount.Increment(1); @@ -1550,7 +1581,7 @@ private: ZenCacheValue CacheValue; std::vector<IoBuffer> Payloads; - if (!m_CacheStore.Get(CacheRecord.Key.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheValue)) + if (!m_CacheStore.Get(CacheRecord.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheValue)) { ZEN_WARN("process upstream FAILED, '{}/{}', cache record doesn't exist", CacheRecord.Key.Bucket, CacheRecord.Key.Hash); return; @@ -1565,7 +1596,7 @@ private: else { ZEN_WARN("process upstream FAILED, '{}/{}/{}/{}', ValueContentId doesn't exist in CAS", - CacheRecord.Key.Namespace, + CacheRecord.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, ValueContentId); diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h index 6f18b3119..13548efc8 100644 --- a/zenserver/upstream/upstreamcache.h +++ b/zenserver/upstream/upstreamcache.h @@ -32,6 +32,7 @@ struct ZenStructuredCacheClientOptions; struct UpstreamCacheRecord { ZenContentType Type = ZenContentType::kBinary; + std::string Namespace; CacheKey Key; std::vector<IoHash> ValueContentIds; }; @@ -163,12 +164,15 @@ public: virtual UpstreamEndpointState GetState() = 0; virtual UpstreamEndpointStatus GetStatus() = 0; - virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) = 0; - virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKeyRequest*> Requests, OnCacheRecordGetComplete&& OnComplete) = 0; + virtual GetUpstreamCacheResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) = 0; + virtual GetUpstreamCacheResult GetCacheRecords(std::string_view Namespace, + std::span<CacheKeyRequest*> Requests, + OnCacheRecordGetComplete&& OnComplete) = 0; - virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& PayloadId) = 0; - virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest*> CacheChunkRequests, - OnCacheValueGetComplete&& OnComplete) = 0; + virtual GetUpstreamCacheResult GetCacheValue(std::string_view Namespace, const CacheKey& CacheKey, const IoHash& PayloadId) = 0; + virtual GetUpstreamCacheResult GetCacheValues(std::string_view Namespace, + std::span<CacheChunkRequest*> CacheChunkRequests, + OnCacheValueGetComplete&& OnComplete) = 0; virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, IoBuffer RecordValue, @@ -196,11 +200,15 @@ public: virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) = 0; virtual void IterateEndpoints(std::function<bool(UpstreamEndpoint&)>&& Fn) = 0; - virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) = 0; - virtual void GetCacheRecords(std::span<CacheKeyRequest*> Requests, OnCacheRecordGetComplete&& OnComplete) = 0; + virtual GetUpstreamCacheResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) = 0; + virtual void GetCacheRecords(std::string_view Namespace, + std::span<CacheKeyRequest*> Requests, + OnCacheRecordGetComplete&& OnComplete) = 0; - virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) = 0; - virtual void GetCacheValues(std::span<CacheChunkRequest*> CacheChunkRequests, OnCacheValueGetComplete&& OnComplete) = 0; + virtual GetUpstreamCacheResult GetCacheValue(std::string_view Namespace, const CacheKey& CacheKey, const IoHash& ValueContentId) = 0; + virtual void GetCacheValues(std::string_view Namespace, + std::span<CacheChunkRequest*> CacheChunkRequests, + OnCacheValueGetComplete&& OnComplete) = 0; virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0; diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp index 1ac4afe5c..efc75b5b4 100644 --- a/zenserver/upstream/zen.cpp +++ b/zenserver/upstream/zen.cpp @@ -408,10 +408,15 @@ ZenStructuredCacheSession::CheckHealth() } ZenCacheResult -ZenStructuredCacheSession::GetCacheRecord(std::string_view BucketId, const IoHash& Key, ZenContentType Type) +ZenStructuredCacheSession::GetCacheRecord(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType Type) { ExtendableStringBuilder<256> Uri; - Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString(); + Uri << m_Client.ServiceUrl() << "/z$/"; + if (Namespace != ZenCacheStore::DefaultNamespace) + { + Uri << Namespace << "/"; + } + Uri << BucketId << "/" << Key.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); @@ -432,10 +437,18 @@ ZenStructuredCacheSession::GetCacheRecord(std::string_view BucketId, const IoHas } ZenCacheResult -ZenStructuredCacheSession::GetCacheValue(std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId) +ZenStructuredCacheSession::GetCacheValue(std::string_view Namespace, + std::string_view BucketId, + const IoHash& Key, + const IoHash& ValueContentId) { ExtendableStringBuilder<256> Uri; - Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString(); + Uri << m_Client.ServiceUrl() << "/z$/"; + if (Namespace != ZenCacheStore::DefaultNamespace) + { + Uri << Namespace << "/"; + } + Uri << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); @@ -457,10 +470,19 @@ ZenStructuredCacheSession::GetCacheValue(std::string_view BucketId, const IoHash } ZenCacheResult -ZenStructuredCacheSession::PutCacheRecord(std::string_view BucketId, const IoHash& Key, IoBuffer Value, ZenContentType Type) +ZenStructuredCacheSession::PutCacheRecord(std::string_view Namespace, + std::string_view BucketId, + const IoHash& Key, + IoBuffer Value, + ZenContentType Type) { ExtendableStringBuilder<256> Uri; - Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString(); + Uri << m_Client.ServiceUrl() << "/z$/"; + if (Namespace != ZenCacheStore::DefaultNamespace) + { + Uri << Namespace << "/"; + } + Uri << BucketId << "/" << Key.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); @@ -485,10 +507,19 @@ ZenStructuredCacheSession::PutCacheRecord(std::string_view BucketId, const IoHas } ZenCacheResult -ZenStructuredCacheSession::PutCacheValue(std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId, IoBuffer Payload) +ZenStructuredCacheSession::PutCacheValue(std::string_view Namespace, + std::string_view BucketId, + const IoHash& Key, + const IoHash& ValueContentId, + IoBuffer Payload) { ExtendableStringBuilder<256> Uri; - Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString(); + Uri << m_Client.ServiceUrl() << "/z$/"; + if (Namespace != ZenCacheStore::DefaultNamespace) + { + Uri << Namespace << "/"; + } + Uri << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h index f70d9d06f..e8590f940 100644 --- a/zenserver/upstream/zen.h +++ b/zenserver/upstream/zen.h @@ -128,10 +128,18 @@ public: ~ZenStructuredCacheSession(); ZenCacheResult CheckHealth(); - ZenCacheResult GetCacheRecord(std::string_view BucketId, const IoHash& Key, ZenContentType Type); - ZenCacheResult GetCacheValue(std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId); - ZenCacheResult PutCacheRecord(std::string_view BucketId, const IoHash& Key, IoBuffer Value, ZenContentType Type); - ZenCacheResult PutCacheValue(std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId, IoBuffer Payload); + ZenCacheResult GetCacheRecord(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType Type); + ZenCacheResult GetCacheValue(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId); + ZenCacheResult PutCacheRecord(std::string_view Namespace, + std::string_view BucketId, + const IoHash& Key, + IoBuffer Value, + ZenContentType Type); + ZenCacheResult PutCacheValue(std::string_view Namespace, + std::string_view BucketId, + const IoHash& Key, + const IoHash& ValueContentId, + IoBuffer Payload); ZenCacheResult InvokeRpc(const CbObjectView& Request); ZenCacheResult InvokeRpc(const CbPackage& Package); diff --git a/zenutil/include/zenutil/cache/cachekey.h b/zenutil/include/zenutil/cache/cachekey.h index 427c99435..9adde8fc7 100644 --- a/zenutil/include/zenutil/cache/cachekey.h +++ b/zenutil/include/zenutil/cache/cachekey.h @@ -12,21 +12,13 @@ namespace zen { struct CacheKey { - std::string Namespace; std::string Bucket; IoHash Hash; - static CacheKey Create(std::string_view Namespace, std::string_view Bucket, const IoHash& Hash) - { - return {.Namespace = ToLower(Namespace), .Bucket = ToLower(Bucket), .Hash = Hash}; - } + static CacheKey Create(std::string_view Bucket, const IoHash& Hash) { return {.Bucket = ToLower(Bucket), .Hash = Hash}; } auto operator<=>(const CacheKey& that) const { - if (auto n = caseSensitiveCompareStrings(Namespace, that.Namespace); n != std::strong_ordering::equal) - { - return n; - } if (auto b = caseSensitiveCompareStrings(Bucket, that.Bucket); b != std::strong_ordering::equal) { return b; |