diff options
| author | Dan Engelbrecht <[email protected]> | 2022-05-12 12:18:21 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-05-12 12:18:21 +0200 |
| commit | 0fc71fc55feb0bfa98c1e6a24f8d8485859dfc70 (patch) | |
| tree | 2515a259075a46745e71b62abfadee293eefe9c8 | |
| parent | Merge pull request #92 from EpicGames/de/bucket-standalone-temp-file-cleanup (diff) | |
| parent | use string::compare in caseSensitiveCompareStrings (diff) | |
| download | zen-actions_updates.tar.xz zen-actions_updates.zip | |
Merge pull request #93 from EpicGames/de/namespaces-continuedv1.0.1.7actions_updates
De/namespaces continued
| -rw-r--r-- | zencore/include/zencore/string.h | 16 | ||||
| -rw-r--r-- | zenserver-test/zenserver-test.cpp | 39 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 249 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.h | 1 | ||||
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 53 | ||||
| -rw-r--r-- | zenserver/upstream/hordecompute.cpp | 20 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.cpp | 106 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.h | 58 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 108 | ||||
| -rw-r--r-- | zenutil/include/zenutil/cache/cachekey.h | 41 |
10 files changed, 432 insertions, 259 deletions
diff --git a/zencore/include/zencore/string.h b/zencore/include/zencore/string.h index 012ee73ee..92f567dae 100644 --- a/zencore/include/zencore/string.h +++ b/zencore/include/zencore/string.h @@ -9,6 +9,7 @@ #include <string.h> #include <charconv> #include <codecvt> +#include <compare> #include <concepts> #include <optional> #include <span> @@ -795,6 +796,21 @@ StrCaseCompare(const char* Lhs, const char* Rhs, int64_t Length = -1) #endif } +/** + * @brief + * Helper function to implement case sensitive spaceship operator for strings. + * MacOS clang version we use does not implement <=> for std::string + * @param Lhs string + * @param Rhs string + * @return std::strong_ordering indicating relationship between Lhs and Rhs + */ +inline auto +caseSensitiveCompareStrings(const std::string& Lhs, const std::string& Rhs) +{ + int r = Lhs.compare(Rhs); + return r == 0 ? std::strong_ordering::equal : r < 0 ? std::strong_ordering::less : std::strong_ordering::greater; +} + ////////////////////////////////////////////////////////////////////////// /** diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp index 7e9a36a81..0f4858bd5 100644 --- a/zenserver-test/zenserver-test.cpp +++ b/zenserver-test/zenserver-test.cpp @@ -1451,16 +1451,18 @@ TEST_CASE("zcache.rpc") return zen::IoBuffer(zen::IoBuffer::Clone, MemStream.Data(), MemStream.Size()); }; - auto PutCacheRecords = - [&AppendCacheRecord, - &ToIoBuffer](std::string_view BaseUri, std::string_view Bucket, size_t Num, size_t PayloadSize = 1024) -> std::vector<CacheKey> { + auto PutCacheRecords = [&AppendCacheRecord, &ToIoBuffer](std::string_view BaseUri, + std::string_view Namespace, + std::string_view Bucket, + size_t Num, + size_t PayloadSize = 1024) -> std::vector<CacheKey> { std::vector<zen::CacheKey> OutKeys; for (uint32_t Key = 1; Key <= Num; ++Key) { zen::IoHash KeyHash; ((uint32_t*)(KeyHash.Hash))[0] = Key; - const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash); + const zen::CacheKey CacheKey = zen::CacheKey::Create(Namespace, Bucket, KeyHash); CbPackage Package; CbWriter Writer; @@ -1561,7 +1563,9 @@ TEST_CASE("zcache.rpc") auto LoadKey = [](zen::CbFieldView KeyView) -> zen::CacheKey { if (zen::CbObjectView KeyObj = KeyView.AsObjectView()) { - return CacheKey::Create(KeyObj["Bucket"sv].AsString(), KeyObj["Hash"].AsHash()); + return CacheKey::Create(KeyObj["Namespace"sv] ? KeyObj["Namespace"sv].AsString() : ""sv, + KeyObj["Bucket"sv].AsString(), + KeyObj["Hash"].AsHash()); } return CacheKey::Empty; }; @@ -1578,7 +1582,7 @@ TEST_CASE("zcache.rpc") Inst.WaitUntilReady(); CachePolicy Policy = CachePolicy::Default; - std::vector<zen::CacheKey> Keys = PutCacheRecords(BaseUri, "mastodon"sv, 128); + std::vector<zen::CacheKey> Keys = PutCacheRecords(BaseUri, ""sv, "mastodon"sv, 128); GetCacheRecordResult Result = GetCacheRecords(BaseUri, Keys, Policy); CHECK(Result.Records.size() == Keys.size()); @@ -1589,9 +1593,11 @@ TEST_CASE("zcache.rpc") CbObjectView RecordObj = RecordView.AsObjectView(); CbObjectView KeyObj = RecordObj["Key"sv].AsObjectView(); - const CacheKey Key = CacheKey::Create(KeyObj["Bucket"sv].AsString(), KeyObj["Hash"].AsHash()); - IoHash AttachmentHash; - size_t NumValues = 0; + const CacheKey Key = CacheKey::Create(KeyObj["Namespace"sv] ? KeyObj["Namespace"sv].AsString() : ""sv, + KeyObj["Bucket"sv].AsString(), + KeyObj["Hash"].AsHash()); + IoHash AttachmentHash; + size_t NumValues = 0; for (CbFieldView Value : RecordObj["Values"sv]) { AttachmentHash = Value.AsObjectView()["RawHash"sv].AsHash(); @@ -1617,13 +1623,13 @@ TEST_CASE("zcache.rpc") Inst.WaitUntilReady(); CachePolicy Policy = CachePolicy::Default; - std::vector<zen::CacheKey> ExistingKeys = PutCacheRecords(BaseUri, "mastodon"sv, 128); + std::vector<zen::CacheKey> ExistingKeys = PutCacheRecords(BaseUri, ""sv, "mastodon"sv, 128); std::vector<zen::CacheKey> Keys; for (const zen::CacheKey& Key : ExistingKeys) { Keys.push_back(Key); - Keys.push_back(CacheKey::Create("missing"sv, IoHash::Zero)); + Keys.push_back(CacheKey::Create("missing"sv, "missing"sv, IoHash::Zero)); } GetCacheRecordResult Result = GetCacheRecords(BaseUri, Keys, Policy); @@ -1671,7 +1677,7 @@ TEST_CASE("zcache.rpc") SpawnServer(UpstreamServer, UpstreamCfg); SpawnServer(LocalServer, LocalCfg); - std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, "mastodon"sv, 4); + std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, ""sv, "mastodon"sv, 4); CachePolicy Policy = CachePolicy::QueryLocal; GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, Keys, Policy); @@ -1696,7 +1702,7 @@ TEST_CASE("zcache.rpc") SpawnServer(UpstreamServer, UpstreamCfg); SpawnServer(LocalServer, LocalCfg); - std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, "mastodon"sv, 4); + std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, ""sv, "mastodon"sv, 4); CachePolicy Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote); GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, Keys, Policy); @@ -1728,8 +1734,9 @@ TEST_CASE("zcache.rpc.allpolicies") SpawnServer(UpstreamServer, UpstreamCfg); SpawnServer(LocalServer, LocalCfg); - std::string_view TestVersion = "F72150A02AE34B57A9EC91D36BA1CE08"sv; - std::string_view TestBucket = "allpoliciestest"sv; + std::string_view TestVersion = "F72150A02AE34B57A9EC91D36BA1CE08"sv; + std::string_view TestBucket = "allpoliciestest"sv; + std::string_view TestNamespace = ""sv; // NumKeys = (2 Value vs Record)*(2 SkipData vs Default)*(2 ForceMiss vs Not)*(2 use local) // *(2 use remote)*(2 UseValue Policy vs not)*(4 cases per type) @@ -1831,7 +1838,7 @@ TEST_CASE("zcache.rpc.allpolicies") IoHash KeyHash = KeyWriter.GetHash(); KeyData& KeyData = KeyDatas[KeyIndex]; - KeyData.Key = CacheKey::Create(TestBucket, KeyHash); + KeyData.Key = CacheKey::Create(TestNamespace, TestBucket, KeyHash); KeyData.KeyIndex = KeyIndex; KeyData.GetRequestsData = (KeyIndex & (1 << 1)) == 0; KeyData.UseValueAPI = (KeyIndex & (1 << 2)) != 0; diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 0f16f6785..eed7a4420 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -140,7 +140,6 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) if (std::all_of(begin(Key), end(Key), [](const char c) { return std::isalnum(c); })) { // Bucket reference - return HandleCacheBucketRequest(Request, Key); } @@ -175,14 +174,18 @@ HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, case HttpVerb::kDelete: // Drop bucket - - if (m_CacheStore.DropBucket(ZenCacheStore::DefaultNamespace, Bucket)) - { - return Request.WriteResponse(HttpResponseCode::OK); - } - else { - return Request.WriteResponse(HttpResponseCode::NotFound); + // TODO: Should add namespace to URI and handle if the namespace is missing for backwards compatability + std::string_view Namespace = ZenCacheStore::DefaultNamespace; + + if (m_CacheStore.DropBucket(Namespace, Bucket)) + { + return Request.WriteResponse(HttpResponseCode::OK); + } + else + { + return Request.WriteResponse(HttpResponseCode::NotFound); + } } break; @@ -226,7 +229,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request } if (EnumHasAllFlags(PolicyFromURL, CachePolicy::QueryLocal) && - m_CacheStore.Get(ZenCacheStore::DefaultNamespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue)) + m_CacheStore.Get(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue)) { Success = true; ZenContentType ContentType = ClientResultValue.Value.GetContentType(); @@ -287,7 +290,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request if (Success) { - ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL)", + ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' (LOCAL)", + Ref.Namespace, Ref.BucketSegment, Ref.HashKey, NiceBytes(ClientResultValue.Value.Size()), @@ -306,7 +310,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request } else if (!EnumHasAllFlags(PolicyFromURL, CachePolicy::QueryRemote)) { - ZEN_DEBUG("MISS - '{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); + ZEN_DEBUG("MISS - '{}/{}/{}' '{}'", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); m_CacheStats.MissCount++; return Request.WriteResponse(HttpResponseCode::NotFound); } @@ -324,7 +328,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request metrics::OperationTiming::Scope $(m_UpstreamGetRequestTiming); - if (GetUpstreamCacheResult UpstreamResult = m_UpstreamCache.GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, AcceptType); + if (GetUpstreamCacheResult UpstreamResult = + m_UpstreamCache.GetCacheRecord({Ref.Namespace, Ref.BucketSegment, Ref.HashKey}, AcceptType); UpstreamResult.Success) { Success = true; @@ -340,7 +345,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request if (ValidationResult != CbValidateError::None) { Success = false; - ZEN_WARN("Get - '{}/{}' '{}' FAILED, invalid compact binary object from upstream", + ZEN_WARN("Get - '{}/{}/{}' '{}' FAILED, invalid compact binary object from upstream", + Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); @@ -351,7 +357,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request if (Success && StoreLocal) { - m_CacheStore.Put(ZenCacheStore::DefaultNamespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue); + m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue); } } else if (AcceptType == ZenContentType::kCbPackage) @@ -405,7 +411,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request if (StoreLocal) { - m_CacheStore.Put(ZenCacheStore::DefaultNamespace, Ref.BucketSegment, Ref.HashKey, CacheValue); + m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue); } BinaryWriter MemStream; @@ -434,14 +440,19 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request else { Success = false; - ZEN_WARN("Get - '{}/{}' '{}' FAILED, invalid upstream package", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); + ZEN_WARN("Get - '{}/{}/{}' '{}' FAILED, invalid upstream package", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + ToString(AcceptType)); } } } if (Success) { - ZEN_DEBUG("HIT - '{}/{}' {} '{}' (UPSTREAM)", + ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' (UPSTREAM)", + Ref.Namespace, Ref.BucketSegment, Ref.HashKey, NiceBytes(ClientResultValue.Value.Size()), @@ -463,7 +474,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request } else { - ZEN_DEBUG("MISS - '{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); + ZEN_DEBUG("MISS - '{}/{}/{}' '{}'", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); m_CacheStats.MissCount++; AsyncRequest.WriteResponse(HttpResponseCode::NotFound); } @@ -486,12 +497,12 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kCompressedBinary) { - ZEN_DEBUG("PUT - '{}/{}' {} '{}'", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size()), ToString(ContentType)); - m_CacheStore.Put(ZenCacheStore::DefaultNamespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body}); + ZEN_DEBUG("PUT - '{}/{}/{}' {} '{}'", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size()), ToString(ContentType)); + m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body}); if (EnumHasAllFlags(PolicyFromURL, CachePolicy::StoreRemote)) { - m_UpstreamCache.EnqueueUpstream({.Type = ContentType, .Key = {Ref.BucketSegment, Ref.HashKey}}); + m_UpstreamCache.EnqueueUpstream({.Type = ContentType, .Key = {Ref.Namespace, Ref.BucketSegment, Ref.HashKey}}); } Request.WriteResponse(HttpResponseCode::Created); @@ -502,7 +513,11 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request if (ValidationResult != CbValidateError::None) { - ZEN_WARN("PUT - '{}/{}' '{}' FAILED, invalid compact binary", Ref.BucketSegment, Ref.HashKey, ToString(ContentType)); + ZEN_WARN("PUT - '{}/{}/{}' '{}' FAILED, invalid compact binary", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + ToString(ContentType)); return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Compact binary validation failed"sv); } @@ -520,7 +535,8 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request TotalCount++; }); - ZEN_DEBUG("PUT - '{}/{}' {} '{}' attachments '{}/{}' (valid/total)", + ZEN_DEBUG("PUT - '{}/{}/{}' {} '{}' attachments '{}/{}' (valid/total)", + Ref.Namespace, Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size()), @@ -529,14 +545,14 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request ValidAttachments.size()); Body.SetContentType(ZenContentType::kCbObject); - m_CacheStore.Put(ZenCacheStore::DefaultNamespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body}); + m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body}); const bool IsPartialRecord = TotalCount != static_cast<int32_t>(ValidAttachments.size()); if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) { m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbObject, - .Key = {Ref.BucketSegment, Ref.HashKey}, + .Key = {Ref.Namespace, Ref.BucketSegment, Ref.HashKey}, .ValueContentIds = std::move(ValidAttachments)}); } @@ -548,7 +564,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request if (!Package.TryLoad(Body)) { - ZEN_WARN("PUT - '{}/{}' '{}' FAILED, invalid package", Ref.BucketSegment, Ref.HashKey, ToString(ContentType)); + ZEN_WARN("PUT - '{}/{}/{}' '{}' FAILED, invalid package", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ToString(ContentType)); return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"sv); } CachePolicy Policy = PolicyFromURL; @@ -578,7 +594,8 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request } else { - ZEN_WARN("PUT - '{}/{}' '{}' FAILED, attachment '{}' is not compressed", + ZEN_WARN("PUT - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed", + Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ToString(HttpContentType::kCbPackage), @@ -599,7 +616,8 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachment(s)"sv); } - ZEN_DEBUG("PUT - '{}/{}' {} '{}', attachments '{}/{}/{}' (new/valid/total)", + ZEN_DEBUG("PUT - '{}/{}/{}' {} '{}', attachments '{}/{}/{}' (new/valid/total)", + Ref.Namespace, Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.GetSize()), @@ -612,14 +630,14 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer(); CacheValue.Value.SetContentType(ZenContentType::kCbObject); - m_CacheStore.Put(ZenCacheStore::DefaultNamespace, Ref.BucketSegment, Ref.HashKey, CacheValue); + m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue); const bool IsPartialRecord = Count.Valid != Count.Total; if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) { m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage, - .Key = {Ref.BucketSegment, Ref.HashKey}, + .Key = {Ref.Namespace, Ref.BucketSegment, Ref.HashKey}, .ValueContentIds = std::move(ValidAttachments)}); } @@ -661,7 +679,7 @@ HttpStructuredCacheService::HandleGetCacheValue(zen::HttpServerRequest& Request, if (QueryUpstream) { - if (auto UpstreamResult = m_UpstreamCache.GetCacheValue({Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId); + if (auto UpstreamResult = m_UpstreamCache.GetCacheValue({Ref.Namespace, Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId); UpstreamResult.Success) { if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) @@ -679,7 +697,8 @@ HttpStructuredCacheService::HandleGetCacheValue(zen::HttpServerRequest& Request, if (!Value) { - ZEN_DEBUG("MISS - '{}/{}/{}' '{}' in {}", + ZEN_DEBUG("MISS - '{}/{}/{}/{}' '{}' in {}", + Ref.Namespace, Ref.BucketSegment, Ref.HashKey, Ref.ValueContentId, @@ -689,7 +708,8 @@ HttpStructuredCacheService::HandleGetCacheValue(zen::HttpServerRequest& Request, return Request.WriteResponse(HttpResponseCode::NotFound); } - ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({}) in {}", + ZEN_DEBUG("HIT - '{}/{}/{}/{}' {} '{}' ({}) in {}", + Ref.Namespace, Ref.BucketSegment, Ref.HashKey, Ref.ValueContentId, @@ -747,7 +767,8 @@ HttpStructuredCacheService::HandlePutCacheValue(zen::HttpServerRequest& Request, CidStore::InsertResult Result = m_CidStore.AddChunk(Compressed); - ZEN_DEBUG("PUT - '{}/{}/{}' {} '{}' ({}) in {}", + ZEN_DEBUG("PUT - '{}/{}/{}/{}' {} '{}' ({}) in {}", + Ref.Namespace, Ref.BucketSegment, Ref.HashKey, Ref.ValueContentId, @@ -772,8 +793,15 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef& return false; } + OutRef.Namespace = ToLower(ZenCacheStore::DefaultNamespace); // TODO: Should add namespace to URI and handle if the namespace is + // missing for backwards compatability OutRef.BucketSegment = ToLower(Key.substr(0, BucketSplitOffset)); + if (!std::all_of(begin(OutRef.Namespace), end(OutRef.Namespace), [](const char c) { return std::isalnum(c); })) + { + return false; + } + if (!std::all_of(begin(OutRef.BucketSegment), end(OutRef.BucketSegment), [](const char c) { return std::isalnum(c); })) { return false; @@ -907,12 +935,14 @@ HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Req std::vector<bool> Results; for (CbFieldView RequestField : Params["Requests"sv]) { - CbObjectView RequestObject = RequestField.AsObjectView(); - CbObjectView RecordObject = RequestObject["Record"sv].AsObjectView(); - CbObjectView KeyView = RecordObject["Key"sv].AsObjectView(); - CbFieldView BucketField = KeyView["Bucket"sv]; - CbFieldView HashField = KeyView["Hash"sv]; - CacheKey Key = CacheKey::Create(BucketField.AsString(), HashField.AsHash()); + CbObjectView RequestObject = RequestField.AsObjectView(); + CbObjectView RecordObject = RequestObject["Record"sv].AsObjectView(); + CbObjectView KeyView = RecordObject["Key"sv].AsObjectView(); + CbFieldView NamespaceField = KeyView["Namespace"sv]; + CbFieldView BucketField = KeyView["Bucket"sv]; + CbFieldView HashField = KeyView["Hash"sv]; + CacheKey Key = + CacheKey::Create(NamespaceField.AsString(ZenCacheStore::DefaultNamespace), BucketField.AsString(), HashField.AsHash()); if (BucketField.HasError() || HashField.HasError() || Key.Bucket.empty()) { return Request.WriteResponse(HttpResponseCode::BadRequest); @@ -981,7 +1011,8 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack } else { - ZEN_WARN("PUT - '{}/{}' '{}' FAILED, attachment '{}' is not compressed", + ZEN_WARN("PUT - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed", + Request.Key.Namespace, Request.Key.Bucket, Request.Key.Hash, ToString(HttpContentType::kCbPackage), @@ -1002,7 +1033,8 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack return PutResult::Invalid; } - ZEN_DEBUG("PUT - '{}/{}' {}, attachments '{}/{}/{}' (new/valid/total)", + ZEN_DEBUG("PUT - '{}/{}/{}' {}, attachments '{}/{}/{}' (new/valid/total)", + Request.Key.Namespace, Request.Key.Bucket, Request.Key.Hash, NiceBytes(TransferredSize), @@ -1014,7 +1046,7 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack CacheValue.Value = IoBuffer(Record.GetSize()); Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize())); CacheValue.Value.SetContentType(ZenContentType::kCbObject); - m_CacheStore.Put(ZenCacheStore::DefaultNamespace, Request.Key.Bucket, Request.Key.Hash, CacheValue); + m_CacheStore.Put(Request.Key.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue); const bool IsPartialRecord = Count.Valid != Count.Total; @@ -1080,13 +1112,14 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt for (CbFieldView RequestField : RequestsArray) { - RecordRequestData& Request = Requests.emplace_back(); - CbObjectView RequestObject = RequestField.AsObjectView(); - CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); - CbFieldView BucketField = KeyObject["Bucket"sv]; - CbFieldView HashField = KeyObject["Hash"sv]; - CacheKey& Key = Request.Upstream.Key; - Key = CacheKey::Create(BucketField.AsString(), HashField.AsHash()); + RecordRequestData& Request = Requests.emplace_back(); + CbObjectView RequestObject = RequestField.AsObjectView(); + CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); + CbFieldView NamespaceField = KeyObject["Namespace"sv]; + CbFieldView BucketField = KeyObject["Bucket"sv]; + CbFieldView HashField = KeyObject["Hash"sv]; + CacheKey& Key = Request.Upstream.Key; + Key = CacheKey::Create(NamespaceField.AsString(ZenCacheStore::DefaultNamespace), BucketField.AsString(), HashField.AsHash()); if (HashField.HasError() || Key.Bucket.empty()) { return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); @@ -1100,7 +1133,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt ZenCacheValue RecordCacheValue; if (EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryLocal) && - m_CacheStore.Get(ZenCacheStore::DefaultNamespace, Key.Bucket, Key.Hash, RecordCacheValue)) + m_CacheStore.Get(Key.Namespace, Key.Bucket, Key.Hash, RecordCacheValue)) { Request.RecordCacheValue = std::move(RecordCacheValue.Value); if (Request.RecordCacheValue.GetContentType() != ZenContentType::kCbObject) @@ -1231,7 +1264,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt Request.RecordObject = ObjectBuffer; if (EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal)) { - m_CacheStore.Put(ZenCacheStore::DefaultNamespace, Key.Bucket, Key.Hash, {.Value = {Request.RecordCacheValue}}); + m_CacheStore.Put(Key.Namespace, Key.Bucket, Key.Hash, {.Value = {Request.RecordCacheValue}}); } ParseValues(Request); Request.UsedUpstream = true; @@ -1269,7 +1302,11 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt } else { - ZEN_DEBUG("Uncompressed value '{}' from upstream cache record '{}/{}'", Value.ContentId, Key.Bucket, Key.Hash); + ZEN_DEBUG("Uncompressed value '{}' from upstream cache record '{}/{}/{}'", + Value.ContentId, + Key.Namespace, + Key.Bucket, + Key.Hash); } } if (!Value.Exists && !EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) @@ -1306,7 +1343,8 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt } } - ZEN_DEBUG("HIT - '{}/{}' {}{}{}", + ZEN_DEBUG("HIT - '{}/{}/{}' {}{}{}", + Key.Namespace, Key.Bucket, Key.Hash, NiceBytes(Request.RecordCacheValue.Size()), @@ -1322,11 +1360,11 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt if (!EnumHasAnyFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::Query)) { // If they requested no query, do not record this as a miss - ZEN_DEBUG("DISABLEDQUERY - '{}/{}'", Key.Bucket, Key.Hash); + ZEN_DEBUG("DISABLEDQUERY - '{}/{}/{}'", Key.Namespace, Key.Bucket, Key.Hash); } else { - ZEN_DEBUG("MISS - '{}/{}' {}", Key.Bucket, Key.Hash, Request.RecordObject ? ""sv : "(PARTIAL)"sv); + ZEN_DEBUG("MISS - '{}/{}/{}' {}", Key.Namespace, Key.Bucket, Key.Hash, Request.RecordObject ? ""sv : "(PARTIAL)"sv); m_CacheStats.MissCount++; } } @@ -1357,11 +1395,13 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ std::vector<bool> Results; for (CbFieldView RequestField : Params["Requests"sv]) { - CbObjectView RequestObject = RequestField.AsObjectView(); - CbObjectView KeyView = RequestObject["Key"sv].AsObjectView(); - CbFieldView BucketField = KeyView["Bucket"sv]; - CbFieldView HashField = KeyView["Hash"sv]; - CacheKey Key = CacheKey::Create(BucketField.AsString(), HashField.AsHash()); + CbObjectView RequestObject = RequestField.AsObjectView(); + CbObjectView KeyView = RequestObject["Key"sv].AsObjectView(); + CbFieldView NamespaceField = KeyView["Namespace"sv]; + CbFieldView BucketField = KeyView["Bucket"sv]; + CbFieldView HashField = KeyView["Hash"sv]; + CacheKey Key = + CacheKey::Create(NamespaceField.AsString(ZenCacheStore::DefaultNamespace), BucketField.AsString(), HashField.AsHash()); if (BucketField.HasError() || HashField.HasError() || Key.Bucket.empty()) { return Request.WriteResponse(HttpResponseCode::BadRequest); @@ -1388,21 +1428,21 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ { IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer(); Value.SetContentType(ZenContentType::kCompressedBinary); - m_CacheStore.Put(ZenCacheStore::DefaultNamespace, Key.Bucket, Key.Hash, {.Value = Value}); + m_CacheStore.Put(Key.Namespace, Key.Bucket, Key.Hash, {.Value = Value}); TransferredSize = Chunk.GetCompressedSize(); } Succeeded = true; } else { - ZEN_WARN("PUTCACHEVALUES - '{}/{}/{}' FAILED, value is not compressed", Key.Bucket, Key.Hash, RawHash); + ZEN_WARN("PUTCACHEVALUES - '{}/{}/{}/{}' FAILED, value is not compressed", Key.Namespace, Key.Bucket, Key.Hash, RawHash); return Request.WriteResponse(HttpResponseCode::BadRequest); } } else if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) { ZenCacheValue ExistingValue; - if (m_CacheStore.Get(ZenCacheStore::DefaultNamespace, Key.Bucket, Key.Hash, ExistingValue) && + if (m_CacheStore.Get(Key.Namespace, Key.Bucket, Key.Hash, ExistingValue) && IsCompressedBinary(ExistingValue.Value.GetContentType())) { Succeeded = true; @@ -1416,7 +1456,12 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCompressedBinary, .Key = Key}); } Results.push_back(Succeeded); - ZEN_DEBUG("PUTCACHEVALUES - '{}/{}' {}, '{}'", Key.Bucket, Key.Hash, NiceBytes(TransferredSize), Succeeded ? "Added"sv : "Invalid"); + ZEN_DEBUG("PUTCACHEVALUES - '{}/{}/{}' {}, '{}'", + Key.Namespace, + Key.Bucket, + Key.Hash, + NiceBytes(TransferredSize), + Succeeded ? "Added"sv : "Invalid"); } if (Results.empty()) { @@ -1466,12 +1511,14 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http { Stopwatch Timer; - RequestData& Request = Requests.emplace_back(); - CbObjectView RequestObject = RequestField.AsObjectView(); - CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); - CbFieldView BucketField = KeyObject["Bucket"sv]; - CbFieldView HashField = KeyObject["Hash"sv]; - Request.Key = CacheKey::Create(BucketField.AsString(), HashField.AsHash()); + RequestData& Request = Requests.emplace_back(); + CbObjectView RequestObject = RequestField.AsObjectView(); + CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); + CbFieldView NamespaceField = KeyObject["Namespace"sv]; + CbFieldView BucketField = KeyObject["Bucket"sv]; + CbFieldView HashField = KeyObject["Hash"sv]; + Request.Key = + CacheKey::Create(NamespaceField.AsString(ZenCacheStore::DefaultNamespace), BucketField.AsString(), HashField.AsHash()); if (BucketField.HasError() || HashField.HasError() || Request.Key.Bucket.empty()) { return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); @@ -1486,15 +1533,15 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http ZenCacheValue CacheValue; if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) { - if (m_CacheStore.Get(ZenCacheStore::DefaultNamespace, Key.Bucket, Key.Hash, CacheValue) && - IsCompressedBinary(CacheValue.Value.GetContentType())) + if (m_CacheStore.Get(Key.Namespace, Key.Bucket, Key.Hash, CacheValue) && IsCompressedBinary(CacheValue.Value.GetContentType())) { Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value)); } } if (Result) { - ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}' {} ({}) in {}", + ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}", + Key.Namespace, Key.Bucket, Key.Hash, NiceBytes(Result.GetCompressed().GetSize()), @@ -1509,11 +1556,12 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http else if (!EnumHasAnyFlags(Policy, CachePolicy::Query)) { // If they requested no query, do not record this as a miss - ZEN_DEBUG("GETCACHEVALUES DISABLEDQUERY - '{}/{}'", Key.Bucket, Key.Hash); + ZEN_DEBUG("GETCACHEVALUES DISABLEDQUERY - '{}/{}/{}'", Key.Namespace, Key.Bucket, Key.Hash); } else { - ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}' ({}) in {}", + ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}", + Key.Namespace, Key.Bucket, Key.Hash, "LOCAL"sv, @@ -1531,7 +1579,7 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http for (size_t Index : RemoteRequestIndexes) { RequestData& Request = Requests[Index]; - RequestedRecordsData.push_back({{Request.Key.Bucket, Request.Key.Hash}}); + RequestedRecordsData.push_back({{Request.Key.Namespace, Request.Key.Bucket, Request.Key.Hash}}); CacheChunkRequests.push_back(&RequestedRecordsData.back()); } Stopwatch Timer; @@ -1551,11 +1599,9 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http // that we copy data from upstream even when SkipData and !StoreLocal are true means that it is too expensive // for us to keep the data only on the upstream server. // if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal)) - m_CacheStore.Put(ZenCacheStore::DefaultNamespace, - Request.Key.Bucket, - Request.Key.Hash, - ZenCacheValue{Params.Value}); - ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}' {} ({}) in {}", + m_CacheStore.Put(Request.Key.Namespace, Request.Key.Bucket, Request.Key.Hash, ZenCacheValue{Params.Value}); + ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}", + ChunkRequest.Key.Namespace, ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, NiceBytes(Request.Result.GetCompressed().GetSize()), @@ -1566,7 +1612,8 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http return; } } - ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}' ({}) in {}", + ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}", + ChunkRequest.Key.Namespace, ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, "UPSTREAM"sv, @@ -1723,9 +1770,12 @@ HttpStructuredCacheService::ParseGetCacheChunksRequest(std::vector<CacheKeyReque ChunkRequest& Request = Requests.emplace_back(); Request.Key = &RequestKey; - CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); - CbFieldView HashField = KeyObject["Hash"sv]; - RequestKey.Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), HashField.AsHash()); + CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); + CbFieldView HashField = KeyObject["Hash"sv]; + CbFieldView NamespaceField = KeyObject["Namespace"sv]; + RequestKey.Key = CacheKey::Create(NamespaceField.AsString(ZenCacheStore::DefaultNamespace), + KeyObject["Bucket"sv].AsString(), + HashField.AsHash()); if (RequestKey.Key.Bucket.empty() || HashField.HasError()) { ZEN_WARN("GetCacheChunks: Invalid key in ChunkRequest."); @@ -1767,9 +1817,11 @@ HttpStructuredCacheService::ParseGetCacheChunksRequest(std::vector<CacheKeyReque } else { - ZEN_WARN("GetCacheChunks: Keys in ChunkRequest are not sorted: {}/{} came after {}/{}.", + ZEN_WARN("GetCacheChunks: Keys in ChunkRequest are not sorted: {}/{}/{} came after {}/{}/{}.", + RequestKey.Key.Namespace, RequestKey.Key.Bucket, RequestKey.Key.Hash, + PreviousRecordKey->Key.Namespace, PreviousRecordKey->Key.Bucket, PreviousRecordKey->Key.Hash); return false; @@ -1810,7 +1862,7 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::vector<CacheKeyRequest>& if (!Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryLocal)) { ZenCacheValue CacheValue; - if (m_CacheStore.Get(ZenCacheStore::DefaultNamespace, RecordKey.Key.Bucket, RecordKey.Key.Hash, CacheValue)) + if (m_CacheStore.Get(RecordKey.Key.Namespace, RecordKey.Key.Bucket, RecordKey.Key.Hash, CacheValue)) { Record.Exists = true; Record.CacheValue = std::move(CacheValue.Value); @@ -1845,7 +1897,7 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::vector<CacheKeyRequest>& if (EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal)) { - m_CacheStore.Put(ZenCacheStore::DefaultNamespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue}); + m_CacheStore.Put(Key.Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue}); } }; m_UpstreamCache.GetCacheRecords(UpstreamRecordRequests, std::move(OnCacheRecordGetComplete)); @@ -1942,7 +1994,7 @@ HttpStructuredCacheService::GetLocalCacheValues(std::vector<cache::detail::Chunk if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal)) { ZenCacheValue CacheValue; - if (m_CacheStore.Get(ZenCacheStore::DefaultNamespace, Request->Key->Key.Bucket, Request->Key->Key.Hash, CacheValue)) + if (m_CacheStore.Get(Request->Key->Key.Namespace, Request->Key->Key.Bucket, Request->Key->Key.Hash, CacheValue)) { if (IsCompressedBinary(CacheValue.Value.GetContentType())) { @@ -2011,7 +2063,7 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::vector<CacheChunkRequest } else { - m_CacheStore.Put(ZenCacheStore::DefaultNamespace, Key.Key.Bucket, Key.Key.Hash, {.Value = Params.Value}); + m_CacheStore.Put(Key.Key.Namespace, Key.Key.Bucket, Key.Key.Hash, {.Value = Params.Value}); } } if (!EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData)) @@ -2058,7 +2110,8 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(std::vector<cache::detai Writer.AddInteger("RawSize"sv, Request.TotalSize); } - ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})", + ZEN_DEBUG("HIT - '{}/{}/{}/{}' {} '{}' ({})", + Request.Key->Key.Namespace, Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId, @@ -2069,11 +2122,19 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(std::vector<cache::detai } else if (!EnumHasAnyFlags(Request.DownstreamPolicy, CachePolicy::Query)) { - ZEN_DEBUG("SKIP - '{}/{}/{}'", Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId); + ZEN_DEBUG("SKIP - '{}/{}/{}/{}'", + Request.Key->Key.Namespace, + Request.Key->Key.Bucket, + Request.Key->Key.Hash, + Request.Key->ValueId); } else { - ZEN_DEBUG("MISS - '{}/{}/{}'", Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId); + ZEN_DEBUG("MISS - '{}/{}/{}/{}'", + Request.Key->Key.Namespace, + Request.Key->Key.Bucket, + Request.Key->Key.Hash, + Request.Key->ValueId); m_CacheStats.MissCount++; } } diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index 00c4260aa..8285d517d 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -80,6 +80,7 @@ public: private: struct CacheRef { + std::string Namespace; std::string BucketSegment; IoHash HashKey; IoHash ValueContentId; diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index ce55b24b6..c21945702 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -3021,6 +3021,59 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) } } +TEST_CASE("z$.namespaces") +{ + using namespace testutils; + + const auto CreateCacheValue = [](size_t Size) -> CbObject { + std::vector<uint8_t> Buf; + Buf.resize(Size); + + CbObjectWriter Writer; + Writer.AddBinary("Binary"sv, Buf.data(), Buf.size()); + return Writer.Save(); + }; + + ScopedTemporaryDirectory TempDir; + CreateDirectories(TempDir.Path()); + + { + CasGc Gc; + ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); + const auto Bucket = "teardrinker"sv; + const auto CustomNamespace = "mynamespace"sv; + + // Create a cache record + const IoHash Key = CreateKey(42); + CbObject CacheValue = CreateCacheValue(4096); + + IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); + Buffer.SetContentType(ZenContentType::kCbObject); + + ZenCacheValue PutValue = {.Value = Buffer}; + Zcs.Put(ZenCacheStore::DefaultNamespace, Bucket, Key, PutValue); + + ZenCacheValue GetValue; + CHECK(Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key, GetValue)); + + CHECK(!Zcs.Get(CustomNamespace, Bucket, Key, GetValue)); + + // This should just be dropped for now until we decide how we add namespaces + Zcs.Put(CustomNamespace, Bucket, Key, PutValue); + CHECK(!Zcs.Get(CustomNamespace, Bucket, Key, GetValue)); + + const IoHash Key2 = CreateKey(43); + CbObject CacheValue2 = CreateCacheValue(4096); + + IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer(); + Buffer2.SetContentType(ZenContentType::kCbObject); + ZenCacheValue PutValue2 = {.Value = Buffer2}; + Zcs.Put(CustomNamespace, Bucket, Key2, PutValue2); + + CHECK(!Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key2, GetValue)); + } +} + TEST_CASE("z$.blocked.disklayer.put") { ScopedTemporaryDirectory TempDir; diff --git a/zenserver/upstream/hordecompute.cpp b/zenserver/upstream/hordecompute.cpp index dbf86cc13..2ec24b303 100644 --- a/zenserver/upstream/hordecompute.cpp +++ b/zenserver/upstream/hordecompute.cpp @@ -198,7 +198,8 @@ namespace detail { } { - PutRefResult RefResult = StorageSession.PutRef("requests"sv, + PutRefResult RefResult = StorageSession.PutRef(StorageSession.Client().DefaultBlobStoreNamespace(), + "requests"sv, UpstreamData.TaskId, UpstreamData.Objects[UpstreamData.TaskId].GetBuffer().AsIoBuffer(), ZenContentType::kCbObject); @@ -292,7 +293,7 @@ namespace detail { std::set<IoHash> Keys; std::transform(Blobs.begin(), Blobs.end(), std::inserter(Keys, Keys.end()), [](const auto& It) { return It.first; }); - CloudCacheExistsResult ExistsResult = Session.BlobExists(Keys); + CloudCacheExistsResult ExistsResult = Session.BlobExists(Session.Client().DefaultBlobStoreNamespace(), Keys); Log().debug("Queried {} missing blobs Need={} Duration={}s Result={}", Keys.size(), ExistsResult.Needs.size(), @@ -309,7 +310,7 @@ namespace detail { for (const auto& Hash : ExistsResult.Needs) { - CloudCacheResult Result = Session.PutBlob(Hash, Blobs.at(Hash)); + CloudCacheResult Result = Session.PutBlob(Session.Client().DefaultBlobStoreNamespace(), Hash, Blobs.at(Hash)); Log().debug("Put blob {} Bytes={} Duration={}s Result={}", Hash, Result.Bytes, Result.ElapsedSeconds, Result.Success); Bytes += Result.Bytes; ElapsedSeconds += Result.ElapsedSeconds; @@ -339,7 +340,7 @@ namespace detail { std::set<IoHash> Keys; std::transform(Objects.begin(), Objects.end(), std::inserter(Keys, Keys.end()), [](const auto& It) { return It.first; }); - CloudCacheExistsResult ExistsResult = Session.ObjectExists(Keys); + CloudCacheExistsResult ExistsResult = Session.ObjectExists(Session.Client().DefaultBlobStoreNamespace(), Keys); Log().debug("Queried {} missing objects Need={} Duration={}s Result={}", Keys.size(), ExistsResult.Needs.size(), @@ -356,7 +357,8 @@ namespace detail { for (const auto& Hash : ExistsResult.Needs) { - CloudCacheResult Result = Session.PutObject(Hash, Objects.at(Hash).GetBuffer().AsIoBuffer()); + CloudCacheResult Result = + Session.PutObject(Session.Client().DefaultBlobStoreNamespace(), Hash, Objects.at(Hash).GetBuffer().AsIoBuffer()); Log().debug("Put object {} Bytes={} Duration={}s Result={}", Hash, Result.Bytes, Result.ElapsedSeconds, Result.Success); Bytes += Result.Bytes; ElapsedSeconds += Result.ElapsedSeconds; @@ -691,7 +693,8 @@ namespace detail { std::map<IoHash, IoBuffer> BinaryData; { - CloudCacheResult ObjectRefResult = Session.GetRef("responses"sv, ResultHash, ZenContentType::kCbObject); + CloudCacheResult ObjectRefResult = + Session.GetRef(Session.Client().DefaultBlobStoreNamespace(), "responses"sv, ResultHash, ZenContentType::kCbObject); Log().debug("Get ref {} Bytes={} Duration={}s Result={}", ResultHash, ObjectRefResult.Bytes, @@ -718,7 +721,8 @@ namespace detail { std::set<IoHash> NeededData; if (OutputHash != IoHash::Zero) { - GetObjectReferencesResult ObjectReferenceResult = Session.GetObjectReferences(OutputHash); + GetObjectReferencesResult ObjectReferenceResult = + Session.GetObjectReferences(Session.Client().DefaultBlobStoreNamespace(), OutputHash); Log().debug("Get object references {} References={} Bytes={} Duration={}s Result={}", ResultHash, ObjectReferenceResult.References.size(), @@ -748,7 +752,7 @@ namespace detail { { continue; } - CloudCacheResult BlobResult = Session.GetBlob(Hash); + CloudCacheResult BlobResult = Session.GetBlob(Session.Client().DefaultBlobStoreNamespace(), Hash); Log().debug("Get blob {} Bytes={} Duration={}s Result={}", Hash, BlobResult.Bytes, diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index 4bec41a29..ddc6c49d2 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -83,12 +83,12 @@ CloudCacheSession::Authenticate() } CloudCacheResult -CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Key) +CloudCacheSession::GetDerivedData(std::string_view Namespace, std::string_view BucketId, std::string_view Key) { ZEN_TRACE_CPU("HordeClient::GetDerivedData"); ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << Namespace << "/" << BucketId << "/" << Key; cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -115,19 +115,18 @@ CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Ke } CloudCacheResult -CloudCacheSession::GetDerivedData(std::string_view BucketId, const IoHash& Key) +CloudCacheSession::GetDerivedData(std::string_view Namespace, std::string_view BucketId, const IoHash& Key) { - return GetDerivedData(BucketId, Key.ToHexString()); + return GetDerivedData(Namespace, BucketId, Key.ToHexString()); } CloudCacheResult -CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenContentType RefType) +CloudCacheSession::GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType) { const std::string ContentType = RefType == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"; ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/" - << Key.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -155,10 +154,10 @@ CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenConte } CloudCacheResult -CloudCacheSession::GetBlob(const IoHash& Key) +CloudCacheSession::GetBlob(std::string_view Namespace, const IoHash& Key) { ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << Namespace << "/" << Key.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -187,12 +186,12 @@ CloudCacheSession::GetBlob(const IoHash& Key) } CloudCacheResult -CloudCacheSession::GetCompressedBlob(const IoHash& Key) +CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& Key) { ZEN_TRACE_CPU("HordeClient::GetCompressedBlob"); ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << Key.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -220,12 +219,12 @@ CloudCacheSession::GetCompressedBlob(const IoHash& Key) } CloudCacheResult -CloudCacheSession::GetObject(const IoHash& Key) +CloudCacheSession::GetObject(std::string_view Namespace, const IoHash& Key) { ZEN_TRACE_CPU("HordeClient::GetObject"); ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -253,14 +252,14 @@ CloudCacheSession::GetObject(const IoHash& Key) } CloudCacheResult -CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Key, IoBuffer DerivedData) +CloudCacheSession::PutDerivedData(std::string_view Namespace, std::string_view BucketId, std::string_view Key, IoBuffer DerivedData) { ZEN_TRACE_CPU("HordeClient::PutDerivedData"); IoHash Hash = IoHash::HashBuffer(DerivedData.Data(), DerivedData.Size()); ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << Namespace << "/" << BucketId << "/" << Key; cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -289,13 +288,13 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Ke } CloudCacheResult -CloudCacheSession::PutDerivedData(std::string_view BucketId, const IoHash& Key, IoBuffer DerivedData) +CloudCacheSession::PutDerivedData(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer DerivedData) { - return PutDerivedData(BucketId, Key.ToHexString(), DerivedData); + return PutDerivedData(Namespace, BucketId, Key.ToHexString(), DerivedData); } PutRefResult -CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType) +CloudCacheSession::PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType) { ZEN_TRACE_CPU("HordeClient::PutRef"); @@ -304,8 +303,7 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer const std::string ContentType = RefType == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"; ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/" - << Key.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -356,13 +354,13 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer } FinalizeRefResult -CloudCacheSession::FinalizeRef(std::string_view BucketId, const IoHash& Key, const IoHash& RefHash) +CloudCacheSession::FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHash) { ZEN_TRACE_CPU("HordeClient::FinalizeRef"); ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/" - << Key.ToHexString() << "/finalize/" << RefHash.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString() << "/finalize/" + << RefHash.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -414,12 +412,12 @@ CloudCacheSession::FinalizeRef(std::string_view BucketId, const IoHash& Key, con } CloudCacheResult -CloudCacheSession::PutBlob(const IoHash& Key, IoBuffer Blob) +CloudCacheSession::PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob) { ZEN_TRACE_CPU("HordeClient::PutBlob"); ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << Namespace << "/" << Key.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -446,12 +444,12 @@ CloudCacheSession::PutBlob(const IoHash& Key, IoBuffer Blob) } CloudCacheResult -CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob) +CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob) { ZEN_TRACE_CPU("HordeClient::PutCompressedBlob"); ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << Key.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -478,12 +476,12 @@ CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob) } CloudCacheResult -CloudCacheSession::PutObject(const IoHash& Key, IoBuffer Object) +CloudCacheSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object) { ZEN_TRACE_CPU("HordeClient::PutObject"); ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -510,13 +508,12 @@ CloudCacheSession::PutObject(const IoHash& Key, IoBuffer Object) } CloudCacheResult -CloudCacheSession::RefExists(std::string_view BucketId, const IoHash& Key) +CloudCacheSession::RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key) { ZEN_TRACE_CPU("HordeClient::RefExists"); ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/" - << Key.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -541,13 +538,12 @@ CloudCacheSession::RefExists(std::string_view BucketId, const IoHash& Key) } GetObjectReferencesResult -CloudCacheSession::GetObjectReferences(const IoHash& Key) +CloudCacheSession::GetObjectReferences(std::string_view Namespace, const IoHash& Key) { ZEN_TRACE_CPU("HordeClient::GetObjectReferences"); ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString() - << "/references"; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString() << "/references"; cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -585,39 +581,39 @@ CloudCacheSession::GetObjectReferences(const IoHash& Key) } CloudCacheResult -CloudCacheSession::BlobExists(const IoHash& Key) +CloudCacheSession::BlobExists(std::string_view Namespace, const IoHash& Key) { - return CacheTypeExists("blobs"sv, Key); + return CacheTypeExists(Namespace, "blobs"sv, Key); } CloudCacheResult -CloudCacheSession::CompressedBlobExists(const IoHash& Key) +CloudCacheSession::CompressedBlobExists(std::string_view Namespace, const IoHash& Key) { - return CacheTypeExists("compressed-blobs"sv, Key); + return CacheTypeExists(Namespace, "compressed-blobs"sv, Key); } CloudCacheResult -CloudCacheSession::ObjectExists(const IoHash& Key) +CloudCacheSession::ObjectExists(std::string_view Namespace, const IoHash& Key) { - return CacheTypeExists("objects"sv, Key); + return CacheTypeExists(Namespace, "objects"sv, Key); } CloudCacheExistsResult -CloudCacheSession::BlobExists(const std::set<IoHash>& Keys) +CloudCacheSession::BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys) { - return CacheTypeExists("blobs"sv, Keys); + return CacheTypeExists(Namespace, "blobs"sv, Keys); } CloudCacheExistsResult -CloudCacheSession::CompressedBlobExists(const std::set<IoHash>& Keys) +CloudCacheSession::CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys) { - return CacheTypeExists("compressed-blobs"sv, Keys); + return CacheTypeExists(Namespace, "compressed-blobs"sv, Keys); } CloudCacheExistsResult -CloudCacheSession::ObjectExists(const std::set<IoHash>& Keys) +CloudCacheSession::ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys) { - return CacheTypeExists("objects"sv, Keys); + return CacheTypeExists(Namespace, "objects"sv, Keys); } CloudCacheResult @@ -685,11 +681,11 @@ CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t } std::vector<IoHash> -CloudCacheSession::Filter(std::string_view BucketId, const std::vector<IoHash>& ChunkHashes) +CloudCacheSession::Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes) { ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl(); - Uri << "/api/v1/s/" << m_CacheClient->DdcNamespace(); + Uri << "/api/v1/s/" << Namespace; ZEN_UNUSED(BucketId, ChunkHashes); @@ -715,12 +711,12 @@ CloudCacheSession::VerifyAccessToken(long StatusCode) } CloudCacheResult -CloudCacheSession::CacheTypeExists(std::string_view TypeId, const IoHash& Key) +CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key) { ZEN_TRACE_CPU("HordeClient::CacheTypeExists"); ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << Namespace << "/" << Key.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -745,7 +741,7 @@ CloudCacheSession::CacheTypeExists(std::string_view TypeId, const IoHash& Key) } CloudCacheExistsResult -CloudCacheSession::CacheTypeExists(std::string_view TypeId, const std::set<IoHash>& Keys) +CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys) { ZEN_TRACE_CPU("HordeClient::CacheTypeExists"); @@ -758,7 +754,7 @@ CloudCacheSession::CacheTypeExists(std::string_view TypeId, const std::set<IoHas Body << "]"; ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << m_CacheClient->BlobStoreNamespace() << "/exist"; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << Namespace << "/exist"; cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -894,8 +890,8 @@ CloudCacheTokenProvider::CreateFromCallback(std::function<CloudCacheAccessToken( CloudCacheClient::CloudCacheClient(const CloudCacheClientOptions& Options, std::unique_ptr<CloudCacheTokenProvider> TokenProvider) : m_Log(zen::logging::Get("jupiter")) , m_ServiceUrl(Options.ServiceUrl) -, m_DdcNamespace(Options.DdcNamespace) -, m_BlobStoreNamespace(Options.BlobStoreNamespace) +, m_DefaultDdcNamespace(Options.DdcNamespace) +, m_DefaultBlobStoreNamespace(Options.BlobStoreNamespace) , m_ComputeCluster(Options.ComputeCluster) , m_ConnectTimeout(Options.ConnectTimeout) , m_Timeout(Options.Timeout) diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h index cff9a9ef1..3d9e6ea7b 100644 --- a/zenserver/upstream/jupiter.h +++ b/zenserver/upstream/jupiter.h @@ -95,38 +95,40 @@ public: ~CloudCacheSession(); CloudCacheResult Authenticate(); - CloudCacheResult GetDerivedData(std::string_view BucketId, std::string_view Key); - CloudCacheResult GetDerivedData(std::string_view BucketId, const IoHash& Key); - CloudCacheResult GetRef(std::string_view BucketId, const IoHash& Key, ZenContentType RefType); - CloudCacheResult GetBlob(const IoHash& Key); - CloudCacheResult GetCompressedBlob(const IoHash& Key); - CloudCacheResult GetObject(const IoHash& Key); + CloudCacheResult GetDerivedData(std::string_view Namespace, std::string_view BucketId, std::string_view Key); + CloudCacheResult GetDerivedData(std::string_view Namespace, std::string_view BucketId, const IoHash& Key); + CloudCacheResult GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType); + CloudCacheResult GetBlob(std::string_view Namespace, const IoHash& Key); + CloudCacheResult GetCompressedBlob(std::string_view Namespace, const IoHash& Key); + CloudCacheResult GetObject(std::string_view Namespace, const IoHash& Key); - CloudCacheResult PutDerivedData(std::string_view BucketId, std::string_view Key, IoBuffer DerivedData); - CloudCacheResult PutDerivedData(std::string_view BucketId, const IoHash& Key, IoBuffer DerivedData); - PutRefResult PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType); - CloudCacheResult PutBlob(const IoHash& Key, IoBuffer Blob); - CloudCacheResult PutCompressedBlob(const IoHash& Key, IoBuffer Blob); - CloudCacheResult PutObject(const IoHash& Key, IoBuffer Object); + CloudCacheResult PutDerivedData(std::string_view Namespace, std::string_view BucketId, std::string_view Key, IoBuffer DerivedData); + CloudCacheResult PutDerivedData(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer DerivedData); + PutRefResult PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType); + CloudCacheResult PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob); + CloudCacheResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob); + CloudCacheResult PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object); - FinalizeRefResult FinalizeRef(std::string_view BucketId, const IoHash& Key, const IoHash& RefHah); + FinalizeRefResult FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHah); - CloudCacheResult RefExists(std::string_view BucketId, const IoHash& Key); + CloudCacheResult RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key); - GetObjectReferencesResult GetObjectReferences(const IoHash& Key); + GetObjectReferencesResult GetObjectReferences(std::string_view Namespace, const IoHash& Key); - CloudCacheResult BlobExists(const IoHash& Key); - CloudCacheResult CompressedBlobExists(const IoHash& Key); - CloudCacheResult ObjectExists(const IoHash& Key); + CloudCacheResult BlobExists(std::string_view Namespace, const IoHash& Key); + CloudCacheResult CompressedBlobExists(std::string_view Namespace, const IoHash& Key); + CloudCacheResult ObjectExists(std::string_view Namespace, const IoHash& Key); - CloudCacheExistsResult BlobExists(const std::set<IoHash>& Keys); - CloudCacheExistsResult CompressedBlobExists(const std::set<IoHash>& Keys); - CloudCacheExistsResult ObjectExists(const std::set<IoHash>& Keys); + CloudCacheExistsResult BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys); + CloudCacheExistsResult CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys); + CloudCacheExistsResult ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys); CloudCacheResult PostComputeTasks(IoBuffer TasksData); CloudCacheResult GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds = 0); - std::vector<IoHash> Filter(std::string_view BucketId, const std::vector<IoHash>& ChunkHashes); + std::vector<IoHash> Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes); + + CloudCacheClient& Client() { return *m_CacheClient; }; private: inline spdlog::logger& Log() { return m_Log; } @@ -134,9 +136,9 @@ private: CloudCacheAccessToken GetAccessToken(bool RefreshToken = false); bool VerifyAccessToken(long StatusCode); - CloudCacheResult CacheTypeExists(std::string_view TypeId, const IoHash& Key); + CloudCacheResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key); - CloudCacheExistsResult CacheTypeExists(std::string_view TypeId, const std::set<IoHash>& Keys); + CloudCacheExistsResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys); spdlog::logger& m_Log; RefPtr<CloudCacheClient> m_CacheClient; @@ -189,8 +191,8 @@ public: ~CloudCacheClient(); CloudCacheAccessToken AcquireAccessToken(); - std::string_view DdcNamespace() const { return m_DdcNamespace; } - std::string_view BlobStoreNamespace() const { return m_BlobStoreNamespace; } + std::string_view DefaultDdcNamespace() const { return m_DefaultDdcNamespace; } + std::string_view DefaultBlobStoreNamespace() const { return m_DefaultBlobStoreNamespace; } std::string_view ComputeCluster() const { return m_ComputeCluster; } std::string_view ServiceUrl() const { return m_ServiceUrl; } @@ -199,8 +201,8 @@ public: private: spdlog::logger& m_Log; std::string m_ServiceUrl; - std::string m_DdcNamespace; - std::string m_BlobStoreNamespace; + std::string m_DefaultDdcNamespace; + std::string m_DefaultBlobStoreNamespace; std::string m_ComputeCluster; std::chrono::milliseconds m_ConnectTimeout{}; std::chrono::milliseconds m_Timeout{}; diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index c870e0773..52513abe9 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -160,6 +160,24 @@ namespace detail { } } + std::string_view GetActualDdcNamespace(CloudCacheSession& Session, std::string_view Namespace) + { + if (Namespace == ZenCacheStore::DefaultNamespace) + { + return Session.Client().DefaultDdcNamespace(); + } + return Namespace; + } + + std::string_view GetActualBlobStoreNamespace(CloudCacheSession& Session, std::string_view Namespace) + { + if (Namespace == ZenCacheStore::DefaultNamespace) + { + return Session.Client().DefaultBlobStoreNamespace(); + } + return Namespace; + } + virtual UpstreamEndpointState GetState() override { return m_Status.EndpointState(); } virtual UpstreamEndpointStatus GetStatus() override { return m_Status.EndpointStatus(); } @@ -173,13 +191,16 @@ namespace detail { CloudCacheSession Session(m_Client); CloudCacheResult Result; + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheKey.Namespace); + if (m_UseLegacyDdc && Type == ZenContentType::kBinary) { - Result = Session.GetDerivedData(CacheKey.Bucket, CacheKey.Hash); + std::string_view DdcNamespace = GetActualDdcNamespace(Session, CacheKey.Namespace); + Result = Session.GetDerivedData(DdcNamespace, CacheKey.Bucket, CacheKey.Hash); } else if (Type == ZenContentType::kCompressedBinary) { - Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject); + Result = Session.GetRef(BlobStoreNamespace, CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject); if (Result.Success) { @@ -190,23 +211,22 @@ namespace detail { IoBuffer ContentBuffer; int NumAttachments = 0; - CacheRecord.IterateAttachments( - [&Session, &Result, &ContentBuffer, &NumAttachments](CbFieldView AttachmentHash) { - CloudCacheResult AttachmentResult = Session.GetCompressedBlob(AttachmentHash.AsHash()); - Result.Bytes += AttachmentResult.Bytes; - Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds; - Result.ErrorCode = AttachmentResult.ErrorCode; + CacheRecord.IterateAttachments([&](CbFieldView AttachmentHash) { + CloudCacheResult AttachmentResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash()); + Result.Bytes += AttachmentResult.Bytes; + Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds; + Result.ErrorCode = AttachmentResult.ErrorCode; - if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response))) - { - Result.Response = AttachmentResult.Response; - ++NumAttachments; - } - else - { - Result.Success = false; - } - }); + if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response))) + { + Result.Response = AttachmentResult.Response; + ++NumAttachments; + } + else + { + Result.Success = false; + } + }); if (NumAttachments != 1) { Result.Success = false; @@ -217,7 +237,7 @@ namespace detail { else { const ZenContentType AcceptType = Type == ZenContentType::kCbPackage ? ZenContentType::kCbObject : Type; - Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, AcceptType); + Result = Session.GetRef(BlobStoreNamespace, CacheKey.Bucket, CacheKey.Hash, AcceptType); if (Result.Success && Type == ZenContentType::kCbPackage) { @@ -228,8 +248,8 @@ namespace detail { { CbObject CacheRecord = LoadCompactBinaryObject(Result.Response); - CacheRecord.IterateAttachments([&Session, &Result, &Package](CbFieldView AttachmentHash) { - CloudCacheResult AttachmentResult = Session.GetCompressedBlob(AttachmentHash.AsHash()); + CacheRecord.IterateAttachments([&](CbFieldView AttachmentHash) { + CloudCacheResult AttachmentResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash()); Result.Bytes += AttachmentResult.Bytes; Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds; Result.ErrorCode = AttachmentResult.ErrorCode; @@ -294,7 +314,9 @@ namespace detail { if (!Result.Error) { - CloudCacheResult RefResult = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject); + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheKey.Namespace); + CloudCacheResult RefResult = + Session.GetRef(BlobStoreNamespace, CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject); AppendResult(RefResult, Result); m_Status.SetFromErrorCode(RefResult.ErrorCode, RefResult.Reason); @@ -305,8 +327,8 @@ namespace detail { if (ValidationResult == CbValidateError::None) { Record = LoadCompactBinaryObject(RefResult.Response); - Record.IterateAttachments([this, &Session, &Result, &Package](CbFieldView AttachmentHash) { - CloudCacheResult BlobResult = Session.GetCompressedBlob(AttachmentHash.AsHash()); + Record.IterateAttachments([&](CbFieldView AttachmentHash) { + CloudCacheResult BlobResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash()); AppendResult(BlobResult, Result); m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); @@ -329,14 +351,15 @@ namespace detail { return Result; } - virtual GetUpstreamCacheResult GetCacheValue(const CacheKey&, const IoHash& ValueContentId) override + virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) override { ZEN_TRACE_CPU("Upstream::Horde::GetSingleCacheValue"); try { CloudCacheSession Session(m_Client); - const CloudCacheResult Result = Session.GetCompressedBlob(ValueContentId); + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheKey.Namespace); + const CloudCacheResult Result = Session.GetCompressedBlob(BlobStoreNamespace, ValueContentId); m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -376,8 +399,9 @@ namespace detail { CompressedBuffer Compressed; if (!Result.Error) { - const CloudCacheResult BlobResult = Session.GetCompressedBlob(Request.ChunkId); - Payload = BlobResult.Response; + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Request.Key.Namespace); + const CloudCacheResult BlobResult = Session.GetCompressedBlob(BlobStoreNamespace, Request.ChunkId); + Payload = BlobResult.Response; AppendResult(BlobResult, Result); @@ -422,13 +446,18 @@ namespace detail { CloudCacheResult Result; for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheRecord.Key.Namespace); if (m_UseLegacyDdc) { - Result = Session.PutDerivedData(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue); + Result = Session.PutDerivedData(BlobStoreNamespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue); } else { - Result = Session.PutRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue, ZenContentType::kBinary); + Result = Session.PutRef(BlobStoreNamespace, + CacheRecord.Key.Bucket, + CacheRecord.Key.Hash, + RecordValue, + ZenContentType::kBinary); } } @@ -527,7 +556,8 @@ namespace detail { int64_t TotalBytes = 0ull; double TotalElapsedSeconds = 0.0; - const auto PutBlobs = [&](std::span<IoHash> ValueContentIds, std::string& OutReason) -> bool { + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Key.Namespace); + const auto PutBlobs = [&](std::span<IoHash> ValueContentIds, std::string& OutReason) -> bool { for (const IoHash& ValueContentId : ValueContentIds) { IoBuffer BlobBuffer; @@ -539,7 +569,7 @@ namespace detail { CloudCacheResult BlobResult; for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++) { - BlobResult = Session.PutCompressedBlob(ValueContentId, BlobBuffer); + BlobResult = Session.PutCompressedBlob(BlobStoreNamespace, ValueContentId, BlobBuffer); } m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); @@ -560,7 +590,7 @@ namespace detail { PutRefResult RefResult; for (int32_t Attempt = 0; Attempt < MaxAttempts && !RefResult.Success; Attempt++) { - RefResult = Session.PutRef(Key.Bucket, Key.Hash, ObjectBuffer, ZenContentType::kCbObject); + RefResult = Session.PutRef(BlobStoreNamespace, Key.Bucket, Key.Hash, ObjectBuffer, ZenContentType::kCbObject); } m_Status.SetFromErrorCode(RefResult.ErrorCode, RefResult.Reason); @@ -581,7 +611,7 @@ namespace detail { } const IoHash RefHash = IoHash::HashBuffer(ObjectBuffer); - FinalizeRefResult FinalizeResult = Session.FinalizeRef(Key.Bucket, Key.Hash, RefHash); + FinalizeRefResult FinalizeResult = Session.FinalizeRef(BlobStoreNamespace, Key.Bucket, Key.Hash, RefHash); m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason); @@ -599,7 +629,7 @@ namespace detail { return {.Reason = std::move(Reason), .Success = false}; } - FinalizeResult = Session.FinalizeRef(Key.Bucket, Key.Hash, RefHash); + FinalizeResult = Session.FinalizeRef(BlobStoreNamespace, Key.Bucket, Key.Hash, RefHash); m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason); @@ -761,6 +791,7 @@ namespace detail { const CacheKey& Key = Request->Key; BatchRequest.BeginObject("Key"sv); { + BatchRequest << "Namespace"sv << Key.Namespace; BatchRequest << "Bucket"sv << Key.Bucket; BatchRequest << "Hash"sv << Key.Hash; } @@ -871,6 +902,7 @@ namespace detail { BatchRequest.BeginObject(); { BatchRequest.BeginObject("Key"sv); + BatchRequest << "Namespace"sv << Request.Key.Namespace; BatchRequest << "Bucket"sv << Request.Key.Bucket; BatchRequest << "Hash"sv << Request.Key.Hash; BatchRequest.EndObject(); @@ -1042,6 +1074,7 @@ namespace detail { const CacheKey& Key = CacheRecord.Key; BatchWriter.BeginObject("Key"sv); { + BatchWriter << "Namespace"sv << Key.Namespace; BatchWriter << "Bucket"sv << Key.Bucket; BatchWriter << "Hash"sv << Key.Hash; } @@ -1517,7 +1550,7 @@ private: ZenCacheValue CacheValue; std::vector<IoBuffer> Payloads; - if (!m_CacheStore.Get(ZenCacheStore::DefaultNamespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheValue)) + if (!m_CacheStore.Get(CacheRecord.Key.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheValue)) { ZEN_WARN("process upstream FAILED, '{}/{}', cache record doesn't exist", CacheRecord.Key.Bucket, CacheRecord.Key.Hash); return; @@ -1531,7 +1564,8 @@ private: } else { - ZEN_WARN("process upstream FAILED, '{}/{}/{}', ValueContentId doesn't exist in CAS", + ZEN_WARN("process upstream FAILED, '{}/{}/{}/{}', ValueContentId doesn't exist in CAS", + CacheRecord.Key.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, ValueContentId); diff --git a/zenutil/include/zenutil/cache/cachekey.h b/zenutil/include/zenutil/cache/cachekey.h index aa649b4dc..427c99435 100644 --- a/zenutil/include/zenutil/cache/cachekey.h +++ b/zenutil/include/zenutil/cache/cachekey.h @@ -12,33 +12,32 @@ namespace zen { struct CacheKey { + std::string Namespace; std::string Bucket; IoHash Hash; - static CacheKey Create(std::string_view Bucket, const IoHash& Hash) { return {.Bucket = ToLower(Bucket), .Hash = Hash}; } - - static const CacheKey Empty; -}; + static CacheKey Create(std::string_view Namespace, std::string_view Bucket, const IoHash& Hash) + { + return {.Namespace = ToLower(Namespace), .Bucket = ToLower(Bucket), .Hash = Hash}; + } -inline bool -operator==(const CacheKey& A, const CacheKey& B) -{ - return A.Bucket == B.Bucket && A.Hash == B.Hash; -} + auto operator<=>(const CacheKey& that) const + { + if (auto n = caseSensitiveCompareStrings(Namespace, that.Namespace); n != std::strong_ordering::equal) + { + return n; + } + if (auto b = caseSensitiveCompareStrings(Bucket, that.Bucket); b != std::strong_ordering::equal) + { + return b; + } + return Hash <=> that.Hash; + } -inline bool -operator!=(const CacheKey& A, const CacheKey& B) -{ - return A.Bucket != B.Bucket || A.Hash != B.Hash; -} + auto operator==(const CacheKey& that) const { return (*this <=> that) == std::strong_ordering::equal; } -inline bool -operator<(const CacheKey& A, const CacheKey& B) -{ - const std::string& BucketA = A.Bucket; - const std::string& BucketB = B.Bucket; - return BucketA == BucketB ? A.Hash < B.Hash : BucketA < BucketB; -} + static const CacheKey Empty; +}; struct CacheChunkRequest { |