diff options
| author | Stefan Boberg <[email protected]> | 2022-05-20 12:42:56 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2022-05-20 12:42:56 +0200 |
| commit | 5b271be0169b842cdc3d576e48bf0ddc2f122852 (patch) | |
| tree | 16f501d2190f19a7281ce3f30365817464e146cb /zenserver | |
| parent | Added ZEN_USE_CATCH2 define (diff) | |
| parent | fix mac compilation error (diff) | |
| download | zen-5b271be0169b842cdc3d576e48bf0ddc2f122852.tar.xz zen-5b271be0169b842cdc3d576e48bf0ddc2f122852.zip | |
Merge branch 'main' into use-catch2
Diffstat (limited to 'zenserver')
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 946 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.h | 54 | ||||
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 2237 | ||||
| -rw-r--r-- | zenserver/cache/structuredcachestore.h | 135 | ||||
| -rw-r--r-- | zenserver/compute/function.cpp | 23 | ||||
| -rw-r--r-- | zenserver/compute/function.h | 1 | ||||
| -rw-r--r-- | zenserver/projectstore.cpp | 53 | ||||
| -rw-r--r-- | zenserver/upstream/hordecompute.cpp | 20 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.cpp | 106 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.h | 58 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamapply.cpp | 22 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamapply.h | 3 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 187 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.h | 26 | ||||
| -rw-r--r-- | zenserver/upstream/zen.cpp | 47 | ||||
| -rw-r--r-- | zenserver/upstream/zen.h | 16 | ||||
| -rw-r--r-- | zenserver/zenserver.cpp | 1 |
17 files changed, 2949 insertions, 986 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 8ae531720..e11499289 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -35,6 +35,11 @@ #include <gsl/gsl-lite.hpp> +#if ZEN_WITH_TESTS +# include <zencore/testing.h> +# include <zencore/testutils.h> +#endif + namespace zen { using namespace std::literals; @@ -65,11 +70,232 @@ struct AttachmentCount struct PutRequestData { + std::string Namespace; CacheKey Key; CbObjectView RecordObject; CacheRecordPolicy Policy; }; +namespace { + static constexpr std::string_view HttpZCacheRPCPrefix = "$rpc"sv; + + struct HttpRequestData + { + std::optional<std::string> Namespace; + std::optional<std::string> Bucket; + std::optional<IoHash> HashKey; + std::optional<IoHash> ValueContentId; + }; + + const char* ValidNamespaceNameCharacters = "abcdefghijklmnopqrstuvwxyz0123456789-_.ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + const char* ValidBucketNameCharacters = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + + std::optional<std::string> GetValidNamespaceName(std::string_view Name) + { + if (Name.empty()) + { + return {}; + } + if (Name.length() > 64) + { + return {}; + } + + if (Name.find_first_not_of(ValidNamespaceNameCharacters) != std::string::npos) + { + return {}; + } + return ToLower(Name); + } + + std::optional<std::string> GetValidBucketName(std::string_view Name) + { + if (Name.empty()) + { + return {}; + } + if (Name.find_first_not_of(ValidBucketNameCharacters) != std::string::npos) + { + return {}; + } + return ToLower(Name); + } + + std::optional<IoHash> GetValidIoHash(std::string_view Hash) + { + if (Hash.length() != IoHash::StringLength) + { + return {}; + } + + IoHash KeyHash; + if (!ParseHexBytes(Hash.data(), Hash.size(), KeyHash.Hash)) + { + return {}; + } + return KeyHash; + } + + bool HttpRequestParseRelativeUri(std::string_view Key, HttpRequestData& Data) + { + std::vector<std::string_view> Tokens; + uint32_t TokenCount = zen::ForEachStrTok(Key, '/', [&](const std::string_view& Token) { + Tokens.push_back(Token); + return true; + }); + + switch (TokenCount) + { + case 1: + Data.Namespace = GetValidNamespaceName(Tokens[0]); + return Data.Namespace.has_value(); + case 2: + { + std::optional<IoHash> PossibleHashKey = GetValidIoHash(Tokens[1]); + if (PossibleHashKey.has_value()) + { + // Legacy bucket/key request + Data.Bucket = GetValidBucketName(Tokens[0]); + if (!Data.Bucket.has_value()) + { + return false; + } + Data.HashKey = PossibleHashKey; + Data.Namespace = ZenCacheStore::DefaultNamespace; + return true; + } + Data.Namespace = GetValidNamespaceName(Tokens[0]); + if (!Data.Namespace.has_value()) + { + return false; + } + Data.Bucket = GetValidBucketName(Tokens[1]); + if (!Data.Bucket.has_value()) + { + return false; + } + return true; + } + case 3: + { + std::optional<IoHash> PossibleHashKey = GetValidIoHash(Tokens[1]); + if (PossibleHashKey.has_value()) + { + // Legacy bucket/key/valueid request + Data.Bucket = GetValidBucketName(Tokens[0]); + if (!Data.Bucket.has_value()) + { + return false; + } + Data.HashKey = PossibleHashKey; + Data.ValueContentId = GetValidIoHash(Tokens[2]); + if (!Data.ValueContentId.has_value()) + { + return false; + } + Data.Namespace = ZenCacheStore::DefaultNamespace; + return true; + } + Data.Namespace = GetValidNamespaceName(Tokens[0]); + if (!Data.Namespace.has_value()) + { + return false; + } + Data.Bucket = GetValidBucketName(Tokens[1]); + if (!Data.Bucket.has_value()) + { + return false; + } + Data.HashKey = GetValidIoHash(Tokens[2]); + if (!Data.HashKey) + { + return false; + } + return true; + } + case 4: + { + Data.Namespace = GetValidNamespaceName(Tokens[0]); + if (!Data.Namespace.has_value()) + { + return false; + } + + Data.Bucket = GetValidBucketName(Tokens[1]); + if (!Data.Bucket.has_value()) + { + return false; + } + + Data.HashKey = GetValidIoHash(Tokens[2]); + if (!Data.HashKey.has_value()) + { + return false; + } + + Data.ValueContentId = GetValidIoHash(Tokens[3]); + if (!Data.ValueContentId.has_value()) + { + return false; + } + return true; + } + default: + return false; + } + } + + std::optional<std::string> GetRpcRequestNamespace(const CbObjectView Params) + { + CbFieldView NamespaceField = Params["Namespace"sv]; + if (!NamespaceField) + { + return std::string(ZenCacheStore::DefaultNamespace); + } + + if (NamespaceField.HasError()) + { + return {}; + } + if (!NamespaceField.IsString()) + { + return {}; + } + return GetValidNamespaceName(NamespaceField.AsString()); + } + + bool GetRpcRequestCacheKey(const CbObjectView& KeyView, CacheKey& Key) + { + CbFieldView BucketField = KeyView["Bucket"sv]; + if (BucketField.HasError()) + { + return false; + } + if (!BucketField.IsString()) + { + return false; + } + std::optional<std::string> Bucket = GetValidBucketName(BucketField.AsString()); + if (!Bucket.has_value()) + { + return false; + } + CbFieldView HashField = KeyView["Hash"sv]; + if (HashField.HasError()) + { + return false; + } + if (!HashField.IsHash()) + { + return false; + } + IoHash Hash = HashField.AsHash(); + Key = CacheKey::Create(*Bucket, Hash); + return true; + } + +} // namespace + ////////////////////////////////////////////////////////////////////////// HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore, @@ -124,45 +350,55 @@ HttpStructuredCacheService::Scrub(ScrubContext& Ctx) void HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) { - CacheRef Ref; - metrics::OperationTiming::Scope $(m_HttpRequests); - if (!ValidateKeyUri(Request, /* out */ Ref)) + std::string_view Key = Request.RelativeUri(); + if (Key == HttpZCacheRPCPrefix) { - std::string_view Key = Request.RelativeUri(); - - if (Key == "$rpc") - { - return HandleRpcRequest(Request); - } - - if (std::all_of(begin(Key), end(Key), [](const char c) { return std::isalnum(c); })) - { - // Bucket reference - - return HandleCacheBucketRequest(Request, Key); - } + return HandleRpcRequest(Request); + } + HttpRequestData RequestData; + if (!HttpRequestParseRelativeUri(Key, RequestData)) + { return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL } - CachePolicy PolicyFromURL = ParseCachePolicy(Request.GetQueryParams()); + if (RequestData.ValueContentId.has_value()) + { + ZEN_ASSERT(RequestData.Namespace.has_value()); + ZEN_ASSERT(RequestData.Bucket.has_value()); + ZEN_ASSERT(RequestData.HashKey.has_value()); + CacheRef Ref = {.Namespace = RequestData.Namespace.value(), + .BucketSegment = RequestData.Bucket.value(), + .HashKey = RequestData.HashKey.value(), + .ValueContentId = RequestData.ValueContentId.value()}; + return HandleCacheValueRequest(Request, Ref, ParseCachePolicy(Request.GetQueryParams())); + } - if (Ref.ValueContentId == IoHash::Zero) + if (RequestData.HashKey.has_value()) { - return HandleCacheRecordRequest(Request, Ref, PolicyFromURL); + ZEN_ASSERT(RequestData.Namespace.has_value()); + ZEN_ASSERT(RequestData.Bucket.has_value()); + CacheRef Ref = {.Namespace = RequestData.Namespace.value(), + .BucketSegment = RequestData.Bucket.value(), + .HashKey = RequestData.HashKey.value(), + .ValueContentId = IoHash::Zero}; + return HandleCacheRecordRequest(Request, Ref, ParseCachePolicy(Request.GetQueryParams())); } - else + + if (RequestData.Bucket.has_value()) { - return HandleCacheValueRequest(Request, Ref, PolicyFromURL); + ZEN_ASSERT(RequestData.Namespace.has_value()); + return HandleCacheBucketRequest(Request, RequestData.Namespace.value(), RequestData.Bucket.value()); } - return; + ZEN_ASSERT(RequestData.Namespace.has_value()); + return HandleCacheNamespaceRequest(Request, RequestData.Namespace.value()); } void -HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, std::string_view Bucket) +HttpStructuredCacheService::HandleCacheNamespaceRequest(zen::HttpServerRequest& Request, std::string_view) { switch (Request.RequestVerb()) { @@ -174,15 +410,47 @@ HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, break; case HttpVerb::kDelete: - // Drop bucket + // Drop namespace + { + // if (m_CacheStore.DropNamespace(Namespace)) + // { + // return Request.WriteResponse(HttpResponseCode::OK); + // } + // else + // { + // return Request.WriteResponse(HttpResponseCode::NotFound); + // } + } + break; + + default: + break; + } +} - if (m_CacheStore.DropBucket(Bucket)) +void +HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, std::string_view Namespace, std::string_view Bucket) +{ + switch (Request.RequestVerb()) + { + case HttpVerb::kHead: + case HttpVerb::kGet: { - return Request.WriteResponse(HttpResponseCode::OK); + // Query stats } - else + break; + + case HttpVerb::kDelete: + // Drop bucket { - return Request.WriteResponse(HttpResponseCode::NotFound); + if (m_CacheStore.DropBucket(Namespace, Bucket)) + { + return Request.WriteResponse(HttpResponseCode::OK); + } + else + { + return Request.WriteResponse(HttpResponseCode::NotFound); + } } break; @@ -192,19 +460,19 @@ HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, } void -HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) +HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) { switch (Request.RequestVerb()) { case HttpVerb::kHead: case HttpVerb::kGet: { - HandleGetCacheRecord(Request, Ref, PolicyFromURL); + HandleGetCacheRecord(Request, Ref, PolicyFromUrl); } break; case HttpVerb::kPut: - HandlePutCacheRecord(Request, Ref, PolicyFromURL); + HandlePutCacheRecord(Request, Ref, PolicyFromUrl); break; default: break; @@ -212,20 +480,21 @@ HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, } void -HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) +HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) { const ZenContentType AcceptType = Request.AcceptContentType(); - const bool SkipData = EnumHasAllFlags(PolicyFromURL, CachePolicy::SkipData); - const bool PartialRecord = EnumHasAllFlags(PolicyFromURL, CachePolicy::PartialRecord); + const bool SkipData = EnumHasAllFlags(PolicyFromUrl, CachePolicy::SkipData); + const bool PartialRecord = EnumHasAllFlags(PolicyFromUrl, CachePolicy::PartialRecord); bool Success = false; ZenCacheValue ClientResultValue; - if (!EnumHasAnyFlags(PolicyFromURL, CachePolicy::Query)) + if (!EnumHasAnyFlags(PolicyFromUrl, CachePolicy::Query)) { return Request.WriteResponse(HttpResponseCode::OK); } - if (EnumHasAllFlags(PolicyFromURL, CachePolicy::QueryLocal) && m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, ClientResultValue)) + if (EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal) && + m_CacheStore.Get(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue)) { Success = true; ZenContentType ContentType = ClientResultValue.Value.GetContentType(); @@ -286,7 +555,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request if (Success) { - ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL)", + ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' (LOCAL)", + Ref.Namespace, Ref.BucketSegment, Ref.HashKey, NiceBytes(ClientResultValue.Value.Size()), @@ -303,9 +573,9 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request return Request.WriteResponse(HttpResponseCode::OK, ClientResultValue.Value.GetContentType(), ClientResultValue.Value); } } - else if (!EnumHasAllFlags(PolicyFromURL, CachePolicy::QueryRemote)) + else if (!EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryRemote)) { - ZEN_DEBUG("MISS - '{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); + ZEN_DEBUG("MISS - '{}/{}/{}' '{}'", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); m_CacheStats.MissCount++; return Request.WriteResponse(HttpResponseCode::NotFound); } @@ -313,17 +583,18 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request // Issue upstream query asynchronously in order to keep requests flowing without // hogging I/O servicing threads with blocking work - Request.WriteResponseAsync([this, AcceptType, PolicyFromURL, Ref](HttpServerRequest& AsyncRequest) { + Request.WriteResponseAsync([this, AcceptType, PolicyFromUrl, Ref](HttpServerRequest& AsyncRequest) { bool Success = false; - const bool PartialRecord = EnumHasAllFlags(PolicyFromURL, CachePolicy::PartialRecord); - const bool QueryLocal = EnumHasAllFlags(PolicyFromURL, CachePolicy::QueryLocal); - const bool StoreLocal = EnumHasAllFlags(PolicyFromURL, CachePolicy::StoreLocal); - const bool SkipData = EnumHasAllFlags(PolicyFromURL, CachePolicy::SkipData); + const bool PartialRecord = EnumHasAllFlags(PolicyFromUrl, CachePolicy::PartialRecord); + const bool QueryLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); + const bool StoreLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreLocal); + const bool SkipData = EnumHasAllFlags(PolicyFromUrl, CachePolicy::SkipData); ZenCacheValue ClientResultValue; metrics::OperationTiming::Scope $(m_UpstreamGetRequestTiming); - if (GetUpstreamCacheResult UpstreamResult = m_UpstreamCache.GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, AcceptType); + if (GetUpstreamCacheResult UpstreamResult = + m_UpstreamCache.GetCacheRecord(Ref.Namespace, {Ref.BucketSegment, Ref.HashKey}, AcceptType); UpstreamResult.Success) { Success = true; @@ -339,7 +610,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request if (ValidationResult != CbValidateError::None) { Success = false; - ZEN_WARN("Get - '{}/{}' '{}' FAILED, invalid compact binary object from upstream", + ZEN_WARN("Get - '{}/{}/{}' '{}' FAILED, invalid compact binary object from upstream", + Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); @@ -350,7 +622,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request if (Success && StoreLocal) { - m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, ClientResultValue); + m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue); } } else if (AcceptType == ZenContentType::kCbPackage) @@ -404,7 +676,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request if (StoreLocal) { - m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue); + m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue); } BinaryWriter MemStream; @@ -433,14 +705,19 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request else { Success = false; - ZEN_WARN("Get - '{}/{}' '{}' FAILED, invalid upstream package", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); + ZEN_WARN("Get - '{}/{}/{}' '{}' FAILED, invalid upstream package", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + ToString(AcceptType)); } } } if (Success) { - ZEN_DEBUG("HIT - '{}/{}' {} '{}' (UPSTREAM)", + ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' (UPSTREAM)", + Ref.Namespace, Ref.BucketSegment, Ref.HashKey, NiceBytes(ClientResultValue.Value.Size()), @@ -462,7 +739,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request } else { - ZEN_DEBUG("MISS - '{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); + ZEN_DEBUG("MISS - '{}/{}/{}' '{}'", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); m_CacheStats.MissCount++; AsyncRequest.WriteResponse(HttpResponseCode::NotFound); } @@ -470,7 +747,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request } void -HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) +HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) { IoBuffer Body = Request.ReadPayload(); @@ -485,12 +762,12 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kCompressedBinary) { - ZEN_DEBUG("PUT - '{}/{}' {} '{}'", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size()), ToString(ContentType)); - m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body}); + ZEN_DEBUG("PUT - '{}/{}/{}' {} '{}'", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size()), ToString(ContentType)); + m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body}); - if (EnumHasAllFlags(PolicyFromURL, CachePolicy::StoreRemote)) + if (EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreRemote)) { - m_UpstreamCache.EnqueueUpstream({.Type = ContentType, .Key = {Ref.BucketSegment, Ref.HashKey}}); + m_UpstreamCache.EnqueueUpstream({.Type = ContentType, .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}}); } Request.WriteResponse(HttpResponseCode::Created); @@ -501,11 +778,15 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request if (ValidationResult != CbValidateError::None) { - ZEN_WARN("PUT - '{}/{}' '{}' FAILED, invalid compact binary", Ref.BucketSegment, Ref.HashKey, ToString(ContentType)); + ZEN_WARN("PUT - '{}/{}/{}' '{}' FAILED, invalid compact binary", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + ToString(ContentType)); return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Compact binary validation failed"sv); } - CachePolicy Policy = PolicyFromURL; + CachePolicy Policy = PolicyFromUrl; CbObjectView CacheRecord(Body.Data()); std::vector<IoHash> ValidAttachments; int32_t TotalCount = 0; @@ -519,7 +800,8 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request TotalCount++; }); - ZEN_DEBUG("PUT - '{}/{}' {} '{}' attachments '{}/{}' (valid/total)", + ZEN_DEBUG("PUT - '{}/{}/{}' {} '{}' attachments '{}/{}' (valid/total)", + Ref.Namespace, Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size()), @@ -528,13 +810,14 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request ValidAttachments.size()); Body.SetContentType(ZenContentType::kCbObject); - m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body}); + m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body}); const bool IsPartialRecord = TotalCount != static_cast<int32_t>(ValidAttachments.size()); if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) { m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbObject, + .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}, .ValueContentIds = std::move(ValidAttachments)}); } @@ -547,10 +830,10 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request if (!Package.TryLoad(Body)) { - ZEN_WARN("PUT - '{}/{}' '{}' FAILED, invalid package", Ref.BucketSegment, Ref.HashKey, ToString(ContentType)); + ZEN_WARN("PUT - '{}/{}/{}' '{}' FAILED, invalid package", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ToString(ContentType)); return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"sv); } - CachePolicy Policy = PolicyFromURL; + CachePolicy Policy = PolicyFromUrl; CbObject CacheRecord = Package.GetObject(); AttachmentCount Count; @@ -577,7 +860,8 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request } else { - ZEN_WARN("PUT - '{}/{}' '{}' FAILED, attachment '{}' is not compressed", + ZEN_WARN("PUT - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed", + Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ToString(HttpContentType::kCbPackage), @@ -598,7 +882,8 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachment(s)"sv); } - ZEN_DEBUG("PUT - '{}/{}' {} '{}', attachments '{}/{}/{}' (new/valid/total)", + ZEN_DEBUG("PUT - '{}/{}/{}' {} '{}', attachments '{}/{}/{}' (new/valid/total)", + Ref.Namespace, Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.GetSize()), @@ -611,13 +896,14 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer(); CacheValue.Value.SetContentType(ZenContentType::kCbObject); - m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue); + m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue); const bool IsPartialRecord = Count.Valid != Count.Total; if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) { m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage, + .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}, .ValueContentIds = std::move(ValidAttachments)}); } @@ -631,16 +917,16 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request } void -HttpStructuredCacheService::HandleCacheValueRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) +HttpStructuredCacheService::HandleCacheValueRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) { switch (Request.RequestVerb()) { case HttpVerb::kHead: case HttpVerb::kGet: - HandleGetCacheValue(Request, Ref, PolicyFromURL); + HandleGetCacheValue(Request, Ref, PolicyFromUrl); break; case HttpVerb::kPut: - HandlePutCacheValue(Request, Ref, PolicyFromURL); + HandlePutCacheValue(Request, Ref, PolicyFromUrl); break; default: break; @@ -648,44 +934,56 @@ HttpStructuredCacheService::HandleCacheValueRequest(HttpServerRequest& Request, } void -HttpStructuredCacheService::HandleGetCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) +HttpStructuredCacheService::HandleGetCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) { + Stopwatch Timer; + IoBuffer Value = m_CidStore.FindChunkByCid(Ref.ValueContentId); bool InUpstreamCache = false; - CachePolicy Policy = PolicyFromURL; - const bool QueryUpstream = !Value && EnumHasAllFlags(Policy, CachePolicy::QueryRemote); - - if (QueryUpstream) + CachePolicy Policy = PolicyFromUrl; { - if (auto UpstreamResult = m_UpstreamCache.GetCacheValue({Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId); - UpstreamResult.Success) + const bool QueryUpstream = !Value && EnumHasAllFlags(Policy, CachePolicy::QueryRemote); + + if (QueryUpstream) { - if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) - { - m_CidStore.AddChunk(Compressed); - InUpstreamCache = true; - } - else + if (auto UpstreamResult = m_UpstreamCache.GetCacheValue(Ref.Namespace, {Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId); + UpstreamResult.Success) { - ZEN_WARN("got uncompressed upstream cache value"); + if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) + { + m_CidStore.AddChunk(Compressed); + InUpstreamCache = true; + } + else + { + ZEN_WARN("got uncompressed upstream cache value"); + } } } } if (!Value) { - ZEN_DEBUG("MISS - '{}/{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, Ref.ValueContentId, ToString(Request.AcceptContentType())); + ZEN_DEBUG("MISS - '{}/{}/{}/{}' '{}' in {}", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + Ref.ValueContentId, + ToString(Request.AcceptContentType()), + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); m_CacheStats.MissCount++; return Request.WriteResponse(HttpResponseCode::NotFound); } - ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})", + ZEN_DEBUG("HIT - '{}/{}/{}/{}' {} '{}' ({}) in {}", + Ref.Namespace, Ref.BucketSegment, Ref.HashKey, Ref.ValueContentId, NiceBytes(Value.Size()), ToString(Value.GetContentType()), - InUpstreamCache ? "UPSTREAM" : "LOCAL"); + InUpstreamCache ? "UPSTREAM" : "LOCAL", + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); m_CacheStats.HitCount++; if (InUpstreamCache) @@ -704,10 +1002,12 @@ HttpStructuredCacheService::HandleGetCacheValue(zen::HttpServerRequest& Request, } void -HttpStructuredCacheService::HandlePutCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) +HttpStructuredCacheService::HandlePutCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) { // Note: Individual cacherecord values are not propagated upstream until a valid cache record has been stored - ZEN_UNUSED(PolicyFromURL); + ZEN_UNUSED(PolicyFromUrl); + + Stopwatch Timer; IoBuffer Body = Request.ReadPayload(); @@ -734,85 +1034,21 @@ HttpStructuredCacheService::HandlePutCacheValue(zen::HttpServerRequest& Request, CidStore::InsertResult Result = m_CidStore.AddChunk(Compressed); - ZEN_DEBUG("PUT - '{}/{}/{}' {} '{}' ({})", + ZEN_DEBUG("PUT - '{}/{}/{}/{}' {} '{}' ({}) in {}", + Ref.Namespace, Ref.BucketSegment, Ref.HashKey, Ref.ValueContentId, NiceBytes(Body.Size()), ToString(Body.GetContentType()), - Result.New ? "NEW" : "OLD"); + Result.New ? "NEW" : "OLD", + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); const HttpResponseCode ResponseCode = Result.New ? HttpResponseCode::Created : HttpResponseCode::OK; Request.WriteResponse(ResponseCode); } -bool -HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef& OutRef) -{ - std::string_view Key = Request.RelativeUri(); - std::string_view::size_type BucketSplitOffset = Key.find_first_of('/'); - - if (BucketSplitOffset == std::string_view::npos) - { - return false; - } - - OutRef.BucketSegment = ToLower(Key.substr(0, BucketSplitOffset)); - - if (!std::all_of(begin(OutRef.BucketSegment), end(OutRef.BucketSegment), [](const char c) { return std::isalnum(c); })) - { - return false; - } - - std::string_view HashSegment; - std::string_view ValueSegment; - - std::string_view::size_type ValueSplitOffset = Key.find_last_of('/'); - - // We know there is a slash so no need to check for npos return - - if (ValueSplitOffset == BucketSplitOffset) - { - // Basic cache record lookup - HashSegment = Key.substr(BucketSplitOffset + 1); - } - else - { - // Cache record + valueid lookup - HashSegment = Key.substr(BucketSplitOffset + 1, ValueSplitOffset - BucketSplitOffset - 1); - ValueSegment = Key.substr(ValueSplitOffset + 1); - } - - if (HashSegment.size() != IoHash::StringLength) - { - return false; - } - - if (!ValueSegment.empty() && ValueSegment.size() == IoHash::StringLength) - { - const bool IsOk = ParseHexBytes(ValueSegment.data(), ValueSegment.size(), OutRef.ValueContentId.Hash); - - if (!IsOk) - { - return false; - } - } - else - { - OutRef.ValueContentId = IoHash::Zero; - } - - const bool IsOk = ParseHexBytes(HashSegment.data(), HashSegment.size(), OutRef.HashKey.Hash); - - if (!IsOk) - { - return false; - } - - return true; -} - void HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request) { @@ -888,23 +1124,27 @@ HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Req ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheRecords"sv); - std::string_view PolicyText = Params["DefaultPolicy"].AsString(); - DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; + std::string_view PolicyText = Params["DefaultPolicy"].AsString(); + std::optional<std::string> Namespace = GetRpcRequestNamespace(Params); + if (!Namespace) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } + DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; std::vector<bool> Results; for (CbFieldView RequestField : Params["Requests"sv]) { CbObjectView RequestObject = RequestField.AsObjectView(); CbObjectView RecordObject = RequestObject["Record"sv].AsObjectView(); CbObjectView KeyView = RecordObject["Key"sv].AsObjectView(); - CbFieldView BucketField = KeyView["Bucket"sv]; - CbFieldView HashField = KeyView["Hash"sv]; - CacheKey Key = CacheKey::Create(BucketField.AsString(), HashField.AsHash()); - if (BucketField.HasError() || HashField.HasError() || Key.Bucket.empty()) + + CacheKey Key; + if (!GetRpcRequestCacheKey(KeyView, Key)) { return Request.WriteResponse(HttpResponseCode::BadRequest); } CacheRecordPolicy Policy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy); - PutRequestData PutRequest{std::move(Key), RecordObject, std::move(Policy)}; + PutRequestData PutRequest{*Namespace, std::move(Key), RecordObject, std::move(Policy)}; PutResult Result = PutCacheRecord(PutRequest, &BatchRequest); @@ -967,7 +1207,8 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack } else { - ZEN_WARN("PUT - '{}/{}' '{}' FAILED, attachment '{}' is not compressed", + ZEN_WARN("PUT - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed", + Request.Namespace, Request.Key.Bucket, Request.Key.Hash, ToString(HttpContentType::kCbPackage), @@ -988,7 +1229,8 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack return PutResult::Invalid; } - ZEN_DEBUG("PUT - '{}/{}' {}, attachments '{}/{}/{}' (new/valid/total)", + ZEN_DEBUG("PUT - '{}/{}/{}' {}, attachments '{}/{}/{}' (new/valid/total)", + Request.Namespace, Request.Key.Bucket, Request.Key.Hash, NiceBytes(TransferredSize), @@ -1000,14 +1242,16 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack CacheValue.Value = IoBuffer(Record.GetSize()); Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize())); CacheValue.Value.SetContentType(ZenContentType::kCbObject); - m_CacheStore.Put(Request.Key.Bucket, Request.Key.Hash, CacheValue); + m_CacheStore.Put(Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue); const bool IsPartialRecord = Count.Valid != Count.Total; if (EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord) { - m_UpstreamCache.EnqueueUpstream( - {.Type = ZenContentType::kCbPackage, .Key = Request.Key, .ValueContentIds = std::move(ValidAttachments)}); + m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage, + .Namespace = Request.Namespace, + .Key = Request.Key, + .ValueContentIds = std::move(ValidAttachments)}); } return PutResult::Success; } @@ -1040,8 +1284,13 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt bool UsedUpstream = false; }; - std::string_view PolicyText = Params["DefaultPolicy"sv].AsString(); - CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; + std::string_view PolicyText = Params["DefaultPolicy"sv].AsString(); + CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; + std::optional<std::string> Namespace = GetRpcRequestNamespace(Params); + if (!Namespace) + { + return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); + } std::vector<RecordRequestData> Requests; std::vector<size_t> UpstreamIndexes; CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); @@ -1069,14 +1318,13 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt RecordRequestData& Request = Requests.emplace_back(); CbObjectView RequestObject = RequestField.AsObjectView(); CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); - CbFieldView BucketField = KeyObject["Bucket"sv]; - CbFieldView HashField = KeyObject["Hash"sv]; - CacheKey& Key = Request.Upstream.Key; - Key = CacheKey::Create(BucketField.AsString(), HashField.AsHash()); - if (HashField.HasError() || Key.Bucket.empty()) + + CacheKey& Key = Request.Upstream.Key; + if (!GetRpcRequestCacheKey(KeyObject, Key)) { return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); } + Request.DownstreamPolicy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy); const CacheRecordPolicy& Policy = Request.DownstreamPolicy; @@ -1085,7 +1333,8 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt bool FoundLocalInvalid = false; ZenCacheValue RecordCacheValue; - if (EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryLocal) && m_CacheStore.Get(Key.Bucket, Key.Hash, RecordCacheValue)) + if (EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryLocal) && + m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, RecordCacheValue)) { Request.RecordCacheValue = std::move(RecordCacheValue.Value); if (Request.RecordCacheValue.GetContentType() != ZenContentType::kCbObject) @@ -1199,7 +1448,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt } } - const auto OnCacheRecordGetComplete = [this, &ParseValues](CacheRecordGetCompleteParams&& Params) { + const auto OnCacheRecordGetComplete = [this, Namespace, &ParseValues](CacheRecordGetCompleteParams&& Params) { if (!Params.Record) { return; @@ -1216,7 +1465,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt Request.RecordObject = ObjectBuffer; if (EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal)) { - m_CacheStore.Put(Key.Bucket, Key.Hash, {.Value = {Request.RecordCacheValue}}); + m_CacheStore.Put(*Namespace, Key.Bucket, Key.Hash, {.Value = {Request.RecordCacheValue}}); } ParseValues(Request); Request.UsedUpstream = true; @@ -1254,7 +1503,11 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt } else { - ZEN_DEBUG("Uncompressed value '{}' from upstream cache record '{}/{}'", Value.ContentId, Key.Bucket, Key.Hash); + ZEN_DEBUG("Uncompressed value '{}' from upstream cache record '{}/{}/{}'", + Value.ContentId, + *Namespace, + Key.Bucket, + Key.Hash); } } if (!Value.Exists && !EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) @@ -1269,7 +1522,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt } }; - m_UpstreamCache.GetCacheRecords(UpstreamRequests, std::move(OnCacheRecordGetComplete)); + m_UpstreamCache.GetCacheRecords(*Namespace, UpstreamRequests, std::move(OnCacheRecordGetComplete)); } CbPackage ResponsePackage; @@ -1291,7 +1544,8 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt } } - ZEN_DEBUG("HIT - '{}/{}' {}{}{}", + ZEN_DEBUG("HIT - '{}/{}/{}' {}{}{}", + *Namespace, Key.Bucket, Key.Hash, NiceBytes(Request.RecordCacheValue.Size()), @@ -1307,11 +1561,11 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt if (!EnumHasAnyFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::Query)) { // If they requested no query, do not record this as a miss - ZEN_DEBUG("DISABLEDQUERY - '{}/{}'", Key.Bucket, Key.Hash); + ZEN_DEBUG("DISABLEDQUERY - '{}/{}/{}'", *Namespace, Key.Bucket, Key.Hash); } else { - ZEN_DEBUG("MISS - '{}/{}' {}", Key.Bucket, Key.Hash, Request.RecordObject ? ""sv : "(PARTIAL)"sv); + ZEN_DEBUG("MISS - '{}/{}/{}' {}", *Namespace, Key.Bucket, Key.Hash, Request.RecordObject ? ""sv : "(PARTIAL)"sv); m_CacheStats.MissCount++; } } @@ -1337,20 +1591,25 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheValues"sv); - std::string_view PolicyText = Params["DefaultPolicy"].AsString(); - CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; + std::string_view PolicyText = Params["DefaultPolicy"].AsString(); + CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; + std::optional<std::string> Namespace = GetRpcRequestNamespace(Params); + if (!Namespace) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } std::vector<bool> Results; for (CbFieldView RequestField : Params["Requests"sv]) { CbObjectView RequestObject = RequestField.AsObjectView(); CbObjectView KeyView = RequestObject["Key"sv].AsObjectView(); - CbFieldView BucketField = KeyView["Bucket"sv]; - CbFieldView HashField = KeyView["Hash"sv]; - CacheKey Key = CacheKey::Create(BucketField.AsString(), HashField.AsHash()); - if (BucketField.HasError() || HashField.HasError() || Key.Bucket.empty()) + + CacheKey Key; + if (!GetRpcRequestCacheKey(KeyView, Key)) { return Request.WriteResponse(HttpResponseCode::BadRequest); } + PolicyText = RequestObject["Policy"sv].AsString(); CachePolicy Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy; IoHash RawHash = RequestObject["RawHash"sv].AsBinaryAttachment(); @@ -1373,21 +1632,22 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ { IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer(); Value.SetContentType(ZenContentType::kCompressedBinary); - m_CacheStore.Put(Key.Bucket, Key.Hash, {.Value = Value}); + m_CacheStore.Put(*Namespace, Key.Bucket, Key.Hash, {.Value = Value}); TransferredSize = Chunk.GetCompressedSize(); } Succeeded = true; } else { - ZEN_WARN("PUTCACHEVALUES - '{}/{}/{}' FAILED, value is not compressed", Key.Bucket, Key.Hash, RawHash); + ZEN_WARN("PUTCACHEVALUES - '{}/{}/{}/{}' FAILED, value is not compressed", *Namespace, Key.Bucket, Key.Hash, RawHash); return Request.WriteResponse(HttpResponseCode::BadRequest); } } else if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) { ZenCacheValue ExistingValue; - if (m_CacheStore.Get(Key.Bucket, Key.Hash, ExistingValue) && IsCompressedBinary(ExistingValue.Value.GetContentType())) + if (m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, ExistingValue) && + IsCompressedBinary(ExistingValue.Value.GetContentType())) { Succeeded = true; } @@ -1397,10 +1657,15 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ if (Succeeded && EnumHasAllFlags(Policy, CachePolicy::StoreRemote)) { - m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCompressedBinary, .Key = Key}); + m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = Key}); } Results.push_back(Succeeded); - ZEN_DEBUG("PUTCACHEVALUES - '{}/{}' {}, '{}'", Key.Bucket, Key.Hash, NiceBytes(TransferredSize), Succeeded ? "Added"sv : "Invalid"); + ZEN_DEBUG("PUTCACHEVALUES - '{}/{}/{}' {}, '{}'", + *Namespace, + Key.Bucket, + Key.Hash, + NiceBytes(TransferredSize), + Succeeded ? "Added"sv : "Invalid"); } if (Results.empty()) { @@ -1431,9 +1696,15 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http { ZEN_TRACE_CPU("Z$::RpcGetCacheValues"); - CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); - std::string_view PolicyText = Params["DefaultPolicy"sv].AsString(); - CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; + CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); + std::string_view PolicyText = Params["DefaultPolicy"sv].AsString(); + CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; + std::optional<std::string> Namespace = GetRpcRequestNamespace(Params); + if (!Namespace) + { + return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); + } + struct RequestData { CacheKey Key; @@ -1444,18 +1715,21 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheValues"sv); + std::vector<size_t> RemoteRequestIndexes; + for (CbFieldView RequestField : Params["Requests"sv]) { + Stopwatch Timer; + RequestData& Request = Requests.emplace_back(); CbObjectView RequestObject = RequestField.AsObjectView(); CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); - CbFieldView BucketField = KeyObject["Bucket"sv]; - CbFieldView HashField = KeyObject["Hash"sv]; - Request.Key = CacheKey::Create(BucketField.AsString(), HashField.AsHash()); - if (BucketField.HasError() || HashField.HasError() || Request.Key.Bucket.empty()) + + if (!GetRpcRequestCacheKey(KeyObject, Request.Key)) { return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); } + PolicyText = RequestObject["Policy"sv].AsString(); Request.Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy; @@ -1463,57 +1737,99 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http CachePolicy Policy = Request.Policy; CompressedBuffer& Result = Request.Result; - ZenCacheValue CacheValue; - std::string_view Source; + ZenCacheValue CacheValue; if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) { - if (m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue) && IsCompressedBinary(CacheValue.Value.GetContentType())) + if (m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, CacheValue) && IsCompressedBinary(CacheValue.Value.GetContentType())) { Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value)); - if (Result) - { - Source = "LOCAL"sv; - } - } - } - if (!Result && EnumHasAllFlags(Policy, CachePolicy::QueryRemote)) - { - GetUpstreamCacheResult UpstreamResult = - m_UpstreamCache.GetCacheRecord({Key.Bucket, Key.Hash}, ZenContentType::kCompressedBinary); - if (UpstreamResult.Success && IsCompressedBinary(UpstreamResult.Value.GetContentType())) - { - Result = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value)); - if (Result) - { - UpstreamResult.Value.SetContentType(ZenContentType::kCompressedBinary); - Source = "UPSTREAM"sv; - // TODO: Respect the StoreLocal flag once we have upstream existence-only checks. For now the requirement - // that we copy data from upstream even when SkipData and !StoreLocal are true means that it is too expensive - // for us to keep the data only on the upstream server. - // if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal)) - { - m_CacheStore.Put(Key.Bucket, Key.Hash, ZenCacheValue{UpstreamResult.Value}); - } - } } } - if (Result) { - ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}' {} ({})", Key.Bucket, Key.Hash, NiceBytes(Result.GetCompressed().GetSize()), Source); + ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}", + *Namespace, + Key.Bucket, + Key.Hash, + NiceBytes(Result.GetCompressed().GetSize()), + "LOCAL"sv, + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); m_CacheStats.HitCount++; } + else if (EnumHasAllFlags(Policy, CachePolicy::QueryRemote)) + { + RemoteRequestIndexes.push_back(Requests.size() - 1); + } else if (!EnumHasAnyFlags(Policy, CachePolicy::Query)) { // If they requested no query, do not record this as a miss - ZEN_DEBUG("GETCACHEVALUES DISABLEDQUERY - '{}/{}'", Key.Bucket, Key.Hash); + ZEN_DEBUG("GETCACHEVALUES DISABLEDQUERY - '{}/{}/{}'", *Namespace, Key.Bucket, Key.Hash); } else { - ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}'", Key.Bucket, Key.Hash); + ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}", + *Namespace, + Key.Bucket, + Key.Hash, + "LOCAL"sv, + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); m_CacheStats.MissCount++; } } + + if (!RemoteRequestIndexes.empty()) + { + std::vector<CacheChunkRequest> RequestedRecordsData; + std::vector<CacheChunkRequest*> CacheChunkRequests; + RequestedRecordsData.reserve(RemoteRequestIndexes.size()); + CacheChunkRequests.reserve(RemoteRequestIndexes.size()); + for (size_t Index : RemoteRequestIndexes) + { + RequestData& Request = Requests[Index]; + RequestedRecordsData.push_back({.Key = {Request.Key.Bucket, Request.Key.Hash}}); + CacheChunkRequests.push_back(&RequestedRecordsData.back()); + } + Stopwatch Timer; + m_UpstreamCache.GetCacheValues( + *Namespace, + CacheChunkRequests, + [this, Namespace, &RequestedRecordsData, &Requests, &RemoteRequestIndexes, &Timer](CacheValueGetCompleteParams&& Params) { + CacheChunkRequest& ChunkRequest = Params.Request; + if (Params.Value) + { + size_t RequestOffset = std::distance(RequestedRecordsData.data(), &ChunkRequest); + size_t RequestIndex = RemoteRequestIndexes[RequestOffset]; + RequestData& Request = Requests[RequestIndex]; + Request.Result = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value)); + if (Request.Result && IsCompressedBinary(Params.Value.GetContentType())) + { + // TODO: Respect the StoreLocal flag once we have upstream existence-only checks. For now the requirement + // that we copy data from upstream even when SkipData and !StoreLocal are true means that it is too expensive + // for us to keep the data only on the upstream server. + // if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal)) + m_CacheStore.Put(*Namespace, Request.Key.Bucket, Request.Key.Hash, ZenCacheValue{Params.Value}); + ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}", + *Namespace, + ChunkRequest.Key.Bucket, + ChunkRequest.Key.Hash, + NiceBytes(Request.Result.GetCompressed().GetSize()), + "UPSTREAM"sv, + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + m_CacheStats.HitCount++; + m_CacheStats.UpstreamHitCount++; + return; + } + } + ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}", + *Namespace, + ChunkRequest.Key.Bucket, + ChunkRequest.Key.Hash, + "UPSTREAM"sv, + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + m_CacheStats.MissCount++; + }); + } + if (Requests.empty()) { return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); @@ -1596,6 +1912,7 @@ HttpStructuredCacheService::HandleRpcGetCacheChunks(zen::HttpServerRequest& Http ZEN_TRACE_CPU("Z$::RpcGetCacheChunks"); + std::string Namespace; std::vector<CacheKeyRequest> RecordKeys; // Data about a Record necessary to identify it to the upstream std::vector<RecordBody> Records; // Scratch-space data about a Record when fulfilling RecordRequests std::vector<CacheChunkRequest> RequestKeys; // Data about a ChunkRequest necessary to identify it to the upstream @@ -1605,27 +1922,28 @@ HttpStructuredCacheService::HandleRpcGetCacheChunks(zen::HttpServerRequest& Http std::vector<CacheChunkRequest*> UpstreamChunks; // ChunkRequests that we need to send to the upstream // Parse requests from the CompactBinary body of the RpcRequest and divide it into RecordRequests and ValueRequests - if (!ParseGetCacheChunksRequest(RecordKeys, Records, RequestKeys, Requests, RecordRequests, ValueRequests, RpcRequest)) + if (!ParseGetCacheChunksRequest(Namespace, RecordKeys, Records, RequestKeys, Requests, RecordRequests, ValueRequests, RpcRequest)) { return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); } // For each Record request, load the Record if necessary to find the Chunk's ContentId, load its Payloads if we // have it locally, and otherwise append a request for the payload to UpstreamChunks - GetLocalCacheRecords(RecordKeys, Records, RecordRequests, UpstreamChunks); + GetLocalCacheRecords(Namespace, RecordKeys, Records, RecordRequests, UpstreamChunks); // For each Value request, load the Value if we have it locally and otherwise append a request for the payload to UpstreamChunks - GetLocalCacheValues(ValueRequests, UpstreamChunks); + GetLocalCacheValues(Namespace, ValueRequests, UpstreamChunks); // Call GetCacheChunks on the upstream for any payloads we do not have locally - GetUpstreamCacheChunks(UpstreamChunks, RequestKeys, Requests); + GetUpstreamCacheChunks(Namespace, UpstreamChunks, RequestKeys, Requests); // Send the payload and descriptive data about each chunk to the client - WriteGetCacheChunksResponse(Requests, HttpRequest); + WriteGetCacheChunksResponse(Namespace, Requests, HttpRequest); } bool -HttpStructuredCacheService::ParseGetCacheChunksRequest(std::vector<CacheKeyRequest>& RecordKeys, +HttpStructuredCacheService::ParseGetCacheChunksRequest(std::string& Namespace, + std::vector<CacheKeyRequest>& RecordKeys, std::vector<cache::detail::RecordBody>& Records, std::vector<CacheChunkRequest>& RequestKeys, std::vector<cache::detail::ChunkRequest>& Requests, @@ -1637,11 +1955,20 @@ HttpStructuredCacheService::ParseGetCacheChunksRequest(std::vector<CacheKeyReque ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheChunks"sv); - CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); - std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString(); - CachePolicy DefaultPolicy = !DefaultPolicyText.empty() ? ParseCachePolicy(DefaultPolicyText) : CachePolicy::Default; - CbArrayView ChunkRequestsArray = Params["ChunkRequests"sv].AsArrayView(); - size_t NumRequests = static_cast<size_t>(ChunkRequestsArray.Num()); + CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); + std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString(); + CachePolicy DefaultPolicy = !DefaultPolicyText.empty() ? ParseCachePolicy(DefaultPolicyText) : CachePolicy::Default; + + std::optional<std::string> NamespaceText = GetRpcRequestNamespace(Params); + if (!NamespaceText) + { + ZEN_WARN("GetCacheChunks: Invalid namespace in ChunkRequest."); + return false; + } + Namespace = *NamespaceText; + + CbArrayView ChunkRequestsArray = Params["ChunkRequests"sv].AsArrayView(); + size_t NumRequests = static_cast<size_t>(ChunkRequestsArray.Num()); // Note that these reservations allow us to take pointers to the elements while populating them. If the reservation is removed, // we will need to change the pointers to indexes to handle reallocations. @@ -1660,12 +1987,10 @@ HttpStructuredCacheService::ParseGetCacheChunksRequest(std::vector<CacheKeyReque CbObjectView RequestObject = RequestView.AsObjectView(); CacheChunkRequest& RequestKey = RequestKeys.emplace_back(); ChunkRequest& Request = Requests.emplace_back(); - Request.Key = &RequestKey; + CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); - CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); - CbFieldView HashField = KeyObject["Hash"sv]; - RequestKey.Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), HashField.AsHash()); - if (RequestKey.Key.Bucket.empty() || HashField.HasError()) + Request.Key = &RequestKey; + if (!GetRpcRequestCacheKey(KeyObject, Request.Key->Key)) { ZEN_WARN("GetCacheChunks: Invalid key in ChunkRequest."); return false; @@ -1730,7 +2055,8 @@ HttpStructuredCacheService::ParseGetCacheChunksRequest(std::vector<CacheKeyReque } void -HttpStructuredCacheService::GetLocalCacheRecords(std::vector<CacheKeyRequest>& RecordKeys, +HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespace, + std::vector<CacheKeyRequest>& RecordKeys, std::vector<cache::detail::RecordBody>& Records, std::vector<cache::detail::ChunkRequest*>& RecordRequests, std::vector<CacheChunkRequest*>& OutUpstreamChunks) @@ -1749,7 +2075,7 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::vector<CacheKeyRequest>& if (!Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryLocal)) { ZenCacheValue CacheValue; - if (m_CacheStore.Get(RecordKey.Key.Bucket, RecordKey.Key.Hash, CacheValue)) + if (m_CacheStore.Get(Namespace, RecordKey.Key.Bucket, RecordKey.Key.Hash, CacheValue)) { Record.Exists = true; Record.CacheValue = std::move(CacheValue.Value); @@ -1766,7 +2092,7 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::vector<CacheKeyRequest>& if (!UpstreamRecordRequests.empty()) { - const auto OnCacheRecordGetComplete = [this, &RecordKeys, &Records](CacheRecordGetCompleteParams&& Params) { + const auto OnCacheRecordGetComplete = [this, Namespace, &RecordKeys, &Records](CacheRecordGetCompleteParams&& Params) { if (!Params.Record) { return; @@ -1784,10 +2110,10 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::vector<CacheKeyRequest>& if (EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal)) { - m_CacheStore.Put(Key.Bucket, Key.Hash, {.Value = Record.CacheValue}); + m_CacheStore.Put(Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue}); } }; - m_UpstreamCache.GetCacheRecords(UpstreamRecordRequests, std::move(OnCacheRecordGetComplete)); + m_UpstreamCache.GetCacheRecords(Namespace, UpstreamRecordRequests, std::move(OnCacheRecordGetComplete)); } std::vector<CacheChunkRequest*> UpstreamPayloadRequests; @@ -1871,7 +2197,8 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::vector<CacheKeyRequest>& } void -HttpStructuredCacheService::GetLocalCacheValues(std::vector<cache::detail::ChunkRequest*>& ValueRequests, +HttpStructuredCacheService::GetLocalCacheValues(std::string_view Namespace, + std::vector<cache::detail::ChunkRequest*>& ValueRequests, std::vector<CacheChunkRequest*>& OutUpstreamChunks) { using namespace cache::detail; @@ -1881,7 +2208,7 @@ HttpStructuredCacheService::GetLocalCacheValues(std::vector<cache::detail::Chunk if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal)) { ZenCacheValue CacheValue; - if (m_CacheStore.Get(Request->Key->Key.Bucket, Request->Key->Key.Hash, CacheValue)) + if (m_CacheStore.Get(Namespace, Request->Key->Key.Bucket, Request->Key->Key.Hash, CacheValue)) { if (IsCompressedBinary(CacheValue.Value.GetContentType())) { @@ -1915,7 +2242,8 @@ HttpStructuredCacheService::GetLocalCacheValues(std::vector<cache::detail::Chunk } void -HttpStructuredCacheService::GetUpstreamCacheChunks(std::vector<CacheChunkRequest*>& UpstreamChunks, +HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Namespace, + std::vector<CacheChunkRequest*>& UpstreamChunks, std::vector<CacheChunkRequest>& RequestKeys, std::vector<cache::detail::ChunkRequest>& Requests) { @@ -1923,7 +2251,7 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::vector<CacheChunkRequest if (!UpstreamChunks.empty()) { - const auto OnCacheValueGetComplete = [this, &RequestKeys, &Requests](CacheValueGetCompleteParams&& Params) { + const auto OnCacheValueGetComplete = [this, Namespace, &RequestKeys, &Requests](CacheValueGetCompleteParams&& Params) { if (Params.RawHash == Params.RawHash.Zero) { return; @@ -1950,7 +2278,7 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::vector<CacheChunkRequest } else { - m_CacheStore.Put(Key.Key.Bucket, Key.Key.Hash, {.Value = Params.Value}); + m_CacheStore.Put(Namespace, Key.Key.Bucket, Key.Key.Hash, {.Value = Params.Value}); } } if (!EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData)) @@ -1967,12 +2295,13 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::vector<CacheChunkRequest m_CacheStats.UpstreamHitCount++; }; - m_UpstreamCache.GetCacheValues(UpstreamChunks, std::move(OnCacheValueGetComplete)); + m_UpstreamCache.GetCacheValues(Namespace, UpstreamChunks, std::move(OnCacheValueGetComplete)); } } void -HttpStructuredCacheService::WriteGetCacheChunksResponse(std::vector<cache::detail::ChunkRequest>& Requests, +HttpStructuredCacheService::WriteGetCacheChunksResponse(std::string_view Namespace, + std::vector<cache::detail::ChunkRequest>& Requests, zen::HttpServerRequest& HttpRequest) { using namespace cache::detail; @@ -1997,7 +2326,8 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(std::vector<cache::detai Writer.AddInteger("RawSize"sv, Request.TotalSize); } - ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})", + ZEN_DEBUG("HIT - '{}/{}/{}/{}' {} '{}' ({})", + Namespace, Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId, @@ -2008,11 +2338,11 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(std::vector<cache::detai } else if (!EnumHasAnyFlags(Request.DownstreamPolicy, CachePolicy::Query)) { - ZEN_DEBUG("SKIP - '{}/{}/{}'", Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId); + ZEN_DEBUG("SKIP - '{}/{}/{}/{}'", Namespace, Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId); } else { - ZEN_DEBUG("MISS - '{}/{}/{}'", Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId); + ZEN_DEBUG("MISS - '{}/{}/{}/{}'", Namespace, Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId); m_CacheStats.MissCount++; } } @@ -2081,4 +2411,94 @@ HttpStructuredCacheService::HandleStatusRequest(zen::HttpServerRequest& Request) Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } +#if ZEN_WITH_TESTS + +TEST_CASE("z$service.parse.relative.Uri") +{ + HttpRequestData LegacyBucketRequestBecomesNamespaceRequest; + CHECK(HttpRequestParseRelativeUri("test", LegacyBucketRequestBecomesNamespaceRequest)); + CHECK(LegacyBucketRequestBecomesNamespaceRequest.Namespace == "test"sv); + CHECK(!LegacyBucketRequestBecomesNamespaceRequest.Bucket.has_value()); + CHECK(!LegacyBucketRequestBecomesNamespaceRequest.HashKey.has_value()); + CHECK(!LegacyBucketRequestBecomesNamespaceRequest.ValueContentId.has_value()); + + HttpRequestData LegacyHashKeyRequest; + CHECK(HttpRequestParseRelativeUri("test/0123456789abcdef12340123456789abcdef1234", LegacyHashKeyRequest)); + CHECK(LegacyHashKeyRequest.Namespace == ZenCacheStore::DefaultNamespace); + CHECK(LegacyHashKeyRequest.Bucket == "test"sv); + CHECK(LegacyHashKeyRequest.HashKey == IoHash::FromHexString("0123456789abcdef12340123456789abcdef1234"sv)); + CHECK(!LegacyHashKeyRequest.ValueContentId.has_value()); + + HttpRequestData LegacyValueContentIdRequest; + CHECK(HttpRequestParseRelativeUri("test/0123456789abcdef12340123456789abcdef1234/56789abcdef12345678956789abcdef123456789", + LegacyValueContentIdRequest)); + CHECK(LegacyValueContentIdRequest.Namespace == ZenCacheStore::DefaultNamespace); + CHECK(LegacyValueContentIdRequest.Bucket == "test"sv); + CHECK(LegacyValueContentIdRequest.HashKey == IoHash::FromHexString("0123456789abcdef12340123456789abcdef1234"sv)); + CHECK(LegacyValueContentIdRequest.ValueContentId == IoHash::FromHexString("56789abcdef12345678956789abcdef123456789"sv)); + + HttpRequestData V2DefaultNamespaceRequest; + CHECK(HttpRequestParseRelativeUri("ue4.ddc", V2DefaultNamespaceRequest)); + CHECK(V2DefaultNamespaceRequest.Namespace == "ue4.ddc"); + CHECK(!V2DefaultNamespaceRequest.Bucket.has_value()); + CHECK(!V2DefaultNamespaceRequest.HashKey.has_value()); + CHECK(!V2DefaultNamespaceRequest.ValueContentId.has_value()); + + HttpRequestData V2NamespaceRequest; + CHECK(HttpRequestParseRelativeUri("nicenamespace", V2NamespaceRequest)); + CHECK(V2NamespaceRequest.Namespace == "nicenamespace"sv); + CHECK(!V2NamespaceRequest.Bucket.has_value()); + CHECK(!V2NamespaceRequest.HashKey.has_value()); + CHECK(!V2NamespaceRequest.ValueContentId.has_value()); + + HttpRequestData V2BucketRequestWithDefaultNamespace; + CHECK(HttpRequestParseRelativeUri("ue4.ddc/test", V2BucketRequestWithDefaultNamespace)); + CHECK(V2BucketRequestWithDefaultNamespace.Namespace == "ue4.ddc"); + CHECK(V2BucketRequestWithDefaultNamespace.Bucket == "test"sv); + CHECK(!V2BucketRequestWithDefaultNamespace.HashKey.has_value()); + CHECK(!V2BucketRequestWithDefaultNamespace.ValueContentId.has_value()); + + HttpRequestData V2BucketRequestWithNamespace; + CHECK(HttpRequestParseRelativeUri("nicenamespace/test", V2BucketRequestWithNamespace)); + CHECK(V2BucketRequestWithNamespace.Namespace == "nicenamespace"sv); + CHECK(V2BucketRequestWithNamespace.Bucket == "test"sv); + CHECK(!V2BucketRequestWithNamespace.HashKey.has_value()); + CHECK(!V2BucketRequestWithNamespace.ValueContentId.has_value()); + + HttpRequestData V2HashKeyRequest; + CHECK(HttpRequestParseRelativeUri("test/0123456789abcdef12340123456789abcdef1234", V2HashKeyRequest)); + CHECK(V2HashKeyRequest.Namespace == ZenCacheStore::DefaultNamespace); + CHECK(V2HashKeyRequest.Bucket == "test"); + CHECK(V2HashKeyRequest.HashKey == IoHash::FromHexString("0123456789abcdef12340123456789abcdef1234"sv)); + CHECK(!V2HashKeyRequest.ValueContentId.has_value()); + + HttpRequestData V2ValueContentIdRequest; + CHECK( + HttpRequestParseRelativeUri("nicenamespace/test/0123456789abcdef12340123456789abcdef1234/56789abcdef12345678956789abcdef123456789", + V2ValueContentIdRequest)); + CHECK(V2ValueContentIdRequest.Namespace == "nicenamespace"sv); + CHECK(V2ValueContentIdRequest.Bucket == "test"sv); + CHECK(V2ValueContentIdRequest.HashKey == IoHash::FromHexString("0123456789abcdef12340123456789abcdef1234"sv)); + CHECK(V2ValueContentIdRequest.ValueContentId == IoHash::FromHexString("56789abcdef12345678956789abcdef123456789"sv)); + + HttpRequestData Invalid; + CHECK(!HttpRequestParseRelativeUri("", Invalid)); + CHECK(!HttpRequestParseRelativeUri("/", Invalid)); + CHECK(!HttpRequestParseRelativeUri("bad\2_namespace", Invalid)); + CHECK(!HttpRequestParseRelativeUri("nice/\2\1bucket", Invalid)); + CHECK(!HttpRequestParseRelativeUri("namespace/bucket/0123456789a", Invalid)); + CHECK(!HttpRequestParseRelativeUri("namespace/bucket/0123456789abcdef12340123456789abcdef1234/56789abcdef1234", Invalid)); + CHECK(!HttpRequestParseRelativeUri("namespace/bucket/pppppppp89abcdef12340123456789abcdef1234", Invalid)); + CHECK(!HttpRequestParseRelativeUri("namespace/bucket/0123456789abcdef12340123456789abcdef1234/56789abcd", Invalid)); + CHECK(!HttpRequestParseRelativeUri("namespace/bucket/0123456789abcdef12340123456789abcdef1234/ppppppppdef12345678956789abcdef123456789", + Invalid)); +} + +#endif + +void +z$service_forcelink() +{ +} + } // namespace zen diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index 00c4260aa..890a2ebab 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -80,6 +80,7 @@ public: private: struct CacheRef { + std::string Namespace; std::string BucketSegment; IoHash HashKey; IoHash ValueContentId; @@ -98,26 +99,27 @@ private: Invalid, }; - [[nodiscard]] bool ValidateKeyUri(zen::HttpServerRequest& Request, CacheRef& OutRef); - void HandleCacheRecordRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL); - void HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl); - void HandlePutCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL); - void HandleCacheValueRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL); - void HandleGetCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL); - void HandlePutCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL); - void HandleRpcRequest(zen::HttpServerRequest& Request); - void HandleRpcPutCacheRecords(zen::HttpServerRequest& Request, const CbPackage& BatchRequest); - void HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView BatchRequest); - void HandleRpcPutCacheValues(zen::HttpServerRequest& Request, const CbPackage& BatchRequest); - void HandleRpcGetCacheValues(zen::HttpServerRequest& Request, CbObjectView BatchRequest); - void HandleRpcGetCacheChunks(zen::HttpServerRequest& Request, CbObjectView BatchRequest); - void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket); - virtual void HandleStatsRequest(zen::HttpServerRequest& Request) override; - virtual void HandleStatusRequest(zen::HttpServerRequest& Request) override; - PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package); + void HandleCacheRecordRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl); + void HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl); + void HandlePutCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl); + void HandleCacheValueRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl); + void HandleGetCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl); + void HandlePutCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl); + void HandleRpcRequest(zen::HttpServerRequest& Request); + void HandleRpcPutCacheRecords(zen::HttpServerRequest& Request, const CbPackage& BatchRequest); + void HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView BatchRequest); + void HandleRpcPutCacheValues(zen::HttpServerRequest& Request, const CbPackage& BatchRequest); + void HandleRpcGetCacheValues(zen::HttpServerRequest& Request, CbObjectView BatchRequest); + void HandleRpcGetCacheChunks(zen::HttpServerRequest& Request, CbObjectView BatchRequest); + void HandleCacheNamespaceRequest(zen::HttpServerRequest& Request, std::string_view Namespace); + void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Namespace, std::string_view Bucket); + virtual void HandleStatsRequest(zen::HttpServerRequest& Request) override; + virtual void HandleStatusRequest(zen::HttpServerRequest& Request) override; + PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package); /** HandleRpcGetCacheChunks Helper: Parse the Body object into RecordValue Requests and Value Requests. */ - bool ParseGetCacheChunksRequest(std::vector<CacheKeyRequest>& RecordKeys, + bool ParseGetCacheChunksRequest(std::string& Namespace, + std::vector<CacheKeyRequest>& RecordKeys, std::vector<cache::detail::RecordBody>& Records, std::vector<CacheChunkRequest>& RequestKeys, std::vector<cache::detail::ChunkRequest>& Requests, @@ -125,18 +127,24 @@ private: std::vector<cache::detail::ChunkRequest*>& ValueRequests, CbObjectView RpcRequest); /** HandleRpcGetCacheChunks Helper: Load records to get ContentId for RecordRequests, and load their payloads if they exist locally. */ - void GetLocalCacheRecords(std::vector<CacheKeyRequest>& RecordKeys, + void GetLocalCacheRecords(std::string_view Namespace, + std::vector<CacheKeyRequest>& RecordKeys, std::vector<cache::detail::RecordBody>& Records, std::vector<cache::detail::ChunkRequest*>& RecordRequests, std::vector<CacheChunkRequest*>& OutUpstreamChunks); /** HandleRpcGetCacheChunks Helper: For ValueRequests, load their payloads if they exist locally. */ - void GetLocalCacheValues(std::vector<cache::detail::ChunkRequest*>& ValueRequests, std::vector<CacheChunkRequest*>& OutUpstreamChunks); + void GetLocalCacheValues(std::string_view Namespace, + std::vector<cache::detail::ChunkRequest*>& ValueRequests, + std::vector<CacheChunkRequest*>& OutUpstreamChunks); /** HandleRpcGetCacheChunks Helper: Load payloads from upstream that did not exist locally. */ - void GetUpstreamCacheChunks(std::vector<CacheChunkRequest*>& UpstreamChunks, + void GetUpstreamCacheChunks(std::string_view Namespace, + std::vector<CacheChunkRequest*>& UpstreamChunks, std::vector<CacheChunkRequest>& RequestKeys, std::vector<cache::detail::ChunkRequest>& Requests); /** HandleRpcGetCacheChunks Helper: Send response message containing all chunk results. */ - void WriteGetCacheChunksResponse(std::vector<cache::detail::ChunkRequest>& Requests, zen::HttpServerRequest& HttpRequest); + void WriteGetCacheChunksResponse(std::string_view Namespace, + std::vector<cache::detail::ChunkRequest>& Requests, + zen::HttpServerRequest& HttpRequest); spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; @@ -159,4 +167,6 @@ IsCompressedBinary(ZenContentType Type) return Type == ZenContentType::kBinary || Type == ZenContentType::kCompressedBinary; } +void z$service_forcelink(); + } // namespace zen diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 738e4c1fd..da948fd72 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -14,13 +14,13 @@ #include <zencore/logging.h> #include <zencore/scopeguard.h> #include <zencore/string.h> -#include <zencore/testing.h> -#include <zencore/testutils.h> #include <zencore/thread.h> #include <zencore/timer.h> #include <zencore/trace.h> #include <zenstore/cidstore.h> +#include <xxhash.h> + #if ZEN_PLATFORM_WINDOWS # include <zencore/windows.h> #endif @@ -30,10 +30,183 @@ ZEN_THIRD_PARTY_INCLUDES_START #include <gsl/gsl-lite.hpp> ZEN_THIRD_PARTY_INCLUDES_END +#if ZEN_WITH_TESTS +# include <zencore/testing.h> +# include <zencore/testutils.h> +# include <zencore/workthreadpool.h> +# include <random> +#endif + ////////////////////////////////////////////////////////////////////////// namespace zen { +namespace { + +#pragma pack(push) +#pragma pack(1) + + struct CacheBucketIndexHeader + { + static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; + static constexpr uint32_t CurrentVersion = 1; + + uint32_t Magic = ExpectedMagic; + uint32_t Version = CurrentVersion; + uint64_t EntryCount = 0; + uint64_t LogPosition = 0; + uint32_t PayloadAlignment = 0; + uint32_t Checksum = 0; + + static uint32_t ComputeChecksum(const CacheBucketIndexHeader& Header) + { + return XXH32(&Header.Magic, sizeof(CacheBucketIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); + } + }; + + static_assert(sizeof(CacheBucketIndexHeader) == 32); + + struct LegacyDiskLocation + { + inline LegacyDiskLocation() = default; + + inline LegacyDiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags) + : OffsetAndFlags(CombineOffsetAndFlags(Offset, Flags)) + , LowerSize(ValueSize & 0xFFFFffff) + , IndexDataSize(IndexSize) + { + } + + static const uint64_t kOffsetMask = 0x0000'ffFF'ffFF'ffFFull; + static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull; // Most significant bits of value size (lower 32 bits in LowerSize) + static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull; + static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull; // Stored as a separate file + static const uint64_t kStructured = 0x4000'0000'0000'0000ull; // Serialized as compact binary + static const uint64_t kTombStone = 0x2000'0000'0000'0000ull; // Represents a deleted key/value + static const uint64_t kCompressed = 0x1000'0000'0000'0000ull; // Stored in compressed buffer format + + static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags) { return Offset | Flags; } + + inline uint64_t Offset() const { return OffsetAndFlags & kOffsetMask; } + inline uint64_t Size() const { return LowerSize; } + inline uint64_t IsFlagSet(uint64_t Flag) const { return OffsetAndFlags & Flag; } + inline ZenContentType GetContentType() const + { + ZenContentType ContentType = ZenContentType::kBinary; + + if (IsFlagSet(LegacyDiskLocation::kStructured)) + { + ContentType = ZenContentType::kCbObject; + } + + if (IsFlagSet(LegacyDiskLocation::kCompressed)) + { + ContentType = ZenContentType::kCompressedBinary; + } + + return ContentType; + } + inline uint64_t Flags() const { return OffsetAndFlags & kFlagsMask; } + + private: + uint64_t OffsetAndFlags = 0; + uint32_t LowerSize = 0; + uint32_t IndexDataSize = 0; + }; + + struct LegacyDiskIndexEntry + { + IoHash Key; + LegacyDiskLocation Location; + }; + +#pragma pack(pop) + + static_assert(sizeof(LegacyDiskIndexEntry) == 36); + + const char* IndexExtension = ".uidx"; + const char* LogExtension = ".slog"; + const char* LegacyDataExtension = ".sobs"; + + std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) + { + return BucketDir / (BucketName + IndexExtension); + } + + std::filesystem::path GetTempIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) + { + return BucketDir / (BucketName + ".tmp" + IndexExtension); + } + + std::filesystem::path GetLogPath(const std::filesystem::path& BucketDir, const std::string& BucketName) + { + return BucketDir / (BucketName + LogExtension); + } + + std::filesystem::path GetLegacyLogPath(const std::filesystem::path& BucketDir) + { + return BucketDir / (std::string("zen") + LogExtension); + } + + std::filesystem::path GetLegacyDataPath(const std::filesystem::path& BucketDir) + { + return BucketDir / (std::string("zen") + LegacyDataExtension); + } + + bool ValidateLegacyEntry(const LegacyDiskIndexEntry& Entry, std::string& OutReason) + { + if (Entry.Key == IoHash::Zero) + { + OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); + return false; + } + if (Entry.Location.Flags() & ~(LegacyDiskLocation::kStandaloneFile | LegacyDiskLocation::kStructured | + LegacyDiskLocation::kTombStone | LegacyDiskLocation::kCompressed)) + { + OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.Flags(), Entry.Key.ToHexString()); + return false; + } + if (!Entry.Location.IsFlagSet(LegacyDiskLocation::kTombStone)) + { + return true; + } + uint64_t Size = Entry.Location.Size(); + if (Size == 0) + { + OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); + return false; + } + return true; + } + + bool ValidateEntry(const DiskIndexEntry& Entry, std::string& OutReason) + { + if (Entry.Key == IoHash::Zero) + { + OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); + return false; + } + if (Entry.Location.GetFlags() & + ~(DiskLocation::kStandaloneFile | DiskLocation::kStructured | DiskLocation::kTombStone | DiskLocation::kCompressed)) + { + OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.GetFlags(), Entry.Key.ToHexString()); + return false; + } + if (Entry.Location.IsFlagSet(DiskLocation::kTombStone)) + { + return true; + } + uint64_t Size = Entry.Location.Size(); + if (Size == 0) + { + OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); + return false; + } + return true; + } + +} // namespace + namespace fs = std::filesystem; static CbObject @@ -59,7 +232,7 @@ SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object) WriteFile(Path, Object.GetBuffer().AsIoBuffer()); } -ZenCacheStore::ZenCacheStore(CasGc& Gc, const std::filesystem::path& RootDir) +ZenCacheNamespace::ZenCacheNamespace(CasGc& Gc, const std::filesystem::path& RootDir) : GcStorage(Gc) , GcContributor(Gc) , m_RootDir(RootDir) @@ -75,12 +248,12 @@ ZenCacheStore::ZenCacheStore(CasGc& Gc, const std::filesystem::path& RootDir) #endif } -ZenCacheStore::~ZenCacheStore() +ZenCacheNamespace::~ZenCacheNamespace() { } bool -ZenCacheStore::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) +ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) { ZEN_TRACE_CPU("Z$::Get"); @@ -118,7 +291,7 @@ ZenCacheStore::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheVal } void -ZenCacheStore::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) +ZenCacheNamespace::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) { ZEN_TRACE_CPU("Z$::Put"); @@ -154,7 +327,7 @@ ZenCacheStore::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCa } bool -ZenCacheStore::DropBucket(std::string_view Bucket) +ZenCacheNamespace::DropBucket(std::string_view Bucket) { ZEN_INFO("dropping bucket '{}'", Bucket); @@ -170,13 +343,13 @@ ZenCacheStore::DropBucket(std::string_view Bucket) } void -ZenCacheStore::Flush() +ZenCacheNamespace::Flush() { m_DiskLayer.Flush(); } void -ZenCacheStore::Scrub(ScrubContext& Ctx) +ZenCacheNamespace::Scrub(ScrubContext& Ctx) { if (m_LastScrubTime == Ctx.ScrubTimestamp()) { @@ -190,11 +363,11 @@ ZenCacheStore::Scrub(ScrubContext& Ctx) } void -ZenCacheStore::GatherReferences(GcContext& GcCtx) +ZenCacheNamespace::GatherReferences(GcContext& GcCtx) { Stopwatch Timer; - const auto Guard = MakeGuard( - [this, &Timer] { ZEN_INFO("cache gathered all references from '{}' in {}", m_RootDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); + const auto Guard = + MakeGuard([&] { ZEN_INFO("cache gathered all references from '{}' in {}", m_RootDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); access_tracking::AccessTimes AccessTimes; m_MemLayer.GatherAccessTimes(AccessTimes); @@ -204,14 +377,14 @@ ZenCacheStore::GatherReferences(GcContext& GcCtx) } void -ZenCacheStore::CollectGarbage(GcContext& GcCtx) +ZenCacheNamespace::CollectGarbage(GcContext& GcCtx) { m_MemLayer.Reset(); m_DiskLayer.CollectGarbage(GcCtx); } GcStorageSize -ZenCacheStore::StorageSize() const +ZenCacheNamespace::StorageSize() const { return {.DiskSize = m_DiskLayer.TotalSize(), .MemorySize = m_MemLayer.TotalSize()}; } @@ -425,6 +598,8 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo { using namespace std::literals; + m_BlocksBasePath = BucketDir / "blocks"; + CreateDirectories(BucketDir); std::filesystem::path ManifestPath{BucketDir / "zen_manifest"}; @@ -470,48 +645,465 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo } void -ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool IsNew) +ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot() { - m_BucketDir = BucketDir; + ZEN_INFO("write store snapshot for '{}'", m_BucketDir / m_BucketName); + uint64_t EntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("wrote store snapshot for '{}' containing #{} entries in {}", + m_BucketDir / m_BucketName, + EntryCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); - uint64_t MaxFileOffset = 0; - uint64_t InvalidEntryCount = 0; - m_SobsCursor = 0; - m_TotalSize = 0; + namespace fs = std::filesystem; - m_Index.clear(); + fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); + fs::path STmpIndexPath = GetTempIndexPath(m_BucketDir, m_BucketName); + + // Move index away, we keep it if something goes wrong + if (fs::is_regular_file(STmpIndexPath)) + { + fs::remove(STmpIndexPath); + } + if (fs::is_regular_file(IndexPath)) + { + fs::rename(IndexPath, STmpIndexPath); + } + + try + { + m_SlogFile.Flush(); - std::filesystem::path SobsPath{BucketDir / "zen.sobs"}; - std::filesystem::path SlogPath{BucketDir / "zen.slog"}; + // Write the current state of the location map to a new index state + uint64_t LogCount = 0; + std::vector<DiskIndexEntry> Entries; - m_SobsFile.Open(SobsPath, IsNew ? BasicFile::Mode::kTruncate : BasicFile::Mode::kWrite); - m_SlogFile.Open(SlogPath, IsNew ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite); + { + Entries.resize(m_Index.size()); - m_SlogFile.Replay( - [&](const DiskIndexEntry& Entry) { - if (Entry.Key == IoHash::Zero) + uint64_t EntryIndex = 0; + for (auto& Entry : m_Index) { - ++InvalidEntryCount; + DiskIndexEntry& IndexEntry = Entries[EntryIndex++]; + IndexEntry.Key = Entry.first; + IndexEntry.Location = Entry.second.Location; } - else if (Entry.Location.IsFlagSet(DiskLocation::kTombStone)) + + LogCount = m_SlogFile.GetLogCount(); + } + + BasicFile ObjectIndexFile; + ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate); + CacheBucketIndexHeader Header = {.EntryCount = Entries.size(), + .LogPosition = LogCount, + .PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment)}; + + Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header); + + ObjectIndexFile.Write(&Header, sizeof(CacheBucketIndexHeader), 0); + ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader)); + ObjectIndexFile.Flush(); + ObjectIndexFile.Close(); + EntryCount = Entries.size(); + } + catch (std::exception& Err) + { + ZEN_ERROR("snapshot FAILED, reason: '{}'", Err.what()); + + // Restore any previous snapshot + + if (fs::is_regular_file(STmpIndexPath)) + { + fs::remove(IndexPath); + fs::rename(STmpIndexPath, IndexPath); + } + } + if (fs::is_regular_file(STmpIndexPath)) + { + fs::remove(STmpIndexPath); + } +} + +uint64_t +ZenCacheDiskLayer::CacheBucket::ReadIndexFile() +{ + std::vector<DiskIndexEntry> Entries; + std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); + if (std::filesystem::is_regular_file(IndexPath)) + { + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("read store '{}' index containing #{} entries in {}", + m_BucketDir / m_BucketName, + Entries.size(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + BasicFile ObjectIndexFile; + ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); + uint64_t Size = ObjectIndexFile.FileSize(); + if (Size >= sizeof(CacheBucketIndexHeader)) + { + uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry); + CacheBucketIndexHeader Header; + ObjectIndexFile.Read(&Header, sizeof(Header), 0); + if ((Header.Magic == CacheBucketIndexHeader::ExpectedMagic) && (Header.Version == CacheBucketIndexHeader::CurrentVersion) && + (Header.Checksum == CacheBucketIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0) && + (Header.EntryCount <= ExpectedEntryCount)) { - m_TotalSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); + Entries.resize(Header.EntryCount); + ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader)); + m_PayloadAlignment = Header.PayloadAlignment; + + std::string InvalidEntryReason; + for (const DiskIndexEntry& Entry : Entries) + { + if (!ValidateEntry(Entry, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); + continue; + } + m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount())); + } + + return Header.LogPosition; } else { - m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount())); - m_TotalSize.fetch_add(Entry.Location.Size(), std::memory_order::relaxed); + ZEN_WARN("skipping invalid index file '{}'", IndexPath); } - MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Entry.Location.Offset() + Entry.Location.Size()); - }, - 0); + } + } + return 0; +} +uint64_t +ZenCacheDiskLayer::CacheBucket::ReadLog(uint64_t SkipEntryCount) +{ + std::vector<DiskIndexEntry> Entries; + std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName); + if (std::filesystem::is_regular_file(LogPath)) + { + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("read store '{}' log containing #{} entries in {}", LogPath, Entries.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + TCasLogFile<DiskIndexEntry> CasLog; + CasLog.Open(LogPath, CasLogFile::Mode::kRead); + if (CasLog.Initialize()) + { + uint64_t EntryCount = CasLog.GetLogCount(); + if (EntryCount < SkipEntryCount) + { + ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); + SkipEntryCount = 0; + } + uint64_t ReadCount = EntryCount - SkipEntryCount; + m_Index.reserve(ReadCount); + uint64_t InvalidEntryCount = 0; + CasLog.Replay( + [&](const DiskIndexEntry& Record) { + std::string InvalidEntryReason; + if (Record.Location.Flags & DiskLocation::kTombStone) + { + m_Index.erase(Record.Key); + return; + } + if (!ValidateEntry(Record, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); + ++InvalidEntryCount; + return; + } + m_Index.insert_or_assign(Record.Key, IndexEntry(Record.Location, GcClock::TickCount())); + }, + SkipEntryCount); + if (InvalidEntryCount) + { + ZEN_WARN("found #{} invalid entries in '{}'", InvalidEntryCount, m_BucketDir / m_BucketName); + } + } + } + return 0; +}; + +uint64_t +ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource) +{ + std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_BucketDir); + + if (!std::filesystem::is_regular_file(LegacyLogPath) || std::filesystem::file_size(LegacyLogPath) == 0) + { + return 0; + } + + ZEN_INFO("migrating store {}", m_BucketDir / m_BucketName); + + std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_BucketDir); + + uint64_t MigratedChunkCount = 0; + uint32_t MigratedBlockCount = 0; + Stopwatch MigrationTimer; + uint64_t TotalSize = 0; + const auto _ = MakeGuard([&] { + ZEN_INFO("migrated store '{}' to #{} chunks in #{} blocks in {} ({})", + m_BucketDir / m_BucketName, + MigratedChunkCount, + MigratedBlockCount, + NiceTimeSpanMs(MigrationTimer.GetElapsedTimeMs()), + NiceBytes(TotalSize)); + }); + + uint64_t BlockFileSize = 0; + { + BasicFile BlockFile; + BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead); + BlockFileSize = BlockFile.FileSize(); + } + + std::unordered_map<IoHash, LegacyDiskIndexEntry, IoHash::Hasher> LegacyDiskIndex; + uint64_t InvalidEntryCount = 0; + + size_t BlockChunkCount = 0; + TCasLogFile<LegacyDiskIndexEntry> LegacyCasLog; + LegacyCasLog.Open(LegacyLogPath, CleanSource ? CasLogFile::Mode::kWrite : CasLogFile::Mode::kRead); + { + Stopwatch Timer; + const auto __ = MakeGuard([&] { + ZEN_INFO("read store '{}' legacy log containing #{} entries in {}", + LegacyLogPath, + LegacyDiskIndex.size(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + if (LegacyCasLog.Initialize()) + { + LegacyDiskIndex.reserve(LegacyCasLog.GetLogCount()); + LegacyCasLog.Replay( + [&](const LegacyDiskIndexEntry& Record) { + if (Record.Location.IsFlagSet(LegacyDiskLocation::kTombStone)) + { + LegacyDiskIndex.erase(Record.Key); + return; + } + std::string InvalidEntryReason; + if (!ValidateLegacyEntry(Record, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LegacyLogPath, InvalidEntryReason); + ++InvalidEntryCount; + return; + } + if (m_Index.contains(Record.Key)) + { + return; + } + LegacyDiskIndex[Record.Key] = Record; + }, + 0); + + std::vector<IoHash> BadEntries; + for (const auto& Entry : LegacyDiskIndex) + { + const LegacyDiskIndexEntry& Record(Entry.second); + if (Record.Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile)) + { + continue; + } + if (Record.Location.Offset() + Record.Location.Size() <= BlockFileSize) + { + BlockChunkCount++; + continue; + } + ZEN_WARN("skipping invalid entry in '{}', reason: location is outside of file", LegacyLogPath); + BadEntries.push_back(Entry.first); + } + for (const IoHash& BadHash : BadEntries) + { + LegacyDiskIndex.erase(BadHash); + } + InvalidEntryCount += BadEntries.size(); + } + } if (InvalidEntryCount) { - ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, SlogPath); + ZEN_WARN("found #{} invalid entries in '{}'", InvalidEntryCount, m_BucketDir / m_BucketName); } - m_SobsCursor = (MaxFileOffset + 15) & ~15; + if (LegacyDiskIndex.empty()) + { + LegacyCasLog.Close(); + if (CleanSource) + { + // Older versions of ZenCacheDiskLayer expects the legacy files to exist if it can find + // a manifest and crashes on startup if they don't. + // In order to not break startup when switching back an older version, lets just reset + // the legacy data files to zero length. + + BasicFile LegacyLog; + LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate); + BasicFile LegacySobs; + LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate); + } + return 0; + } + + std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName); + CreateDirectories(LogPath.parent_path()); + TCasLogFile<DiskIndexEntry> CasLog; + CasLog.Open(LogPath, CasLogFile::Mode::kWrite); + + std::unordered_map<size_t, IoHash> ChunkIndexToChunkHash; + std::vector<BlockStoreLocation> ChunkLocations; + ChunkIndexToChunkHash.reserve(BlockChunkCount); + ChunkLocations.reserve(BlockChunkCount); + + std::vector<DiskIndexEntry> LogEntries; + LogEntries.reserve(LegacyDiskIndex.size() - BlockChunkCount); + + for (const auto& Entry : LegacyDiskIndex) + { + const IoHash& ChunkHash = Entry.first; + const LegacyDiskLocation& Location = Entry.second.Location; + if (Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile)) + { + uint8_t Flags = 0xff & (Location.Flags() >> 56); + DiskLocation NewLocation = DiskLocation(Location.Size(), Flags); + LogEntries.push_back({.Key = Entry.second.Key, .Location = NewLocation}); + continue; + } + size_t ChunkIndex = ChunkLocations.size(); + ChunkLocations.push_back({.BlockIndex = 0, .Offset = Location.Offset(), .Size = Location.Size()}); + ChunkIndexToChunkHash[ChunkIndex] = ChunkHash; + TotalSize += Location.Size(); + } + for (const DiskIndexEntry& Entry : LogEntries) + { + m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount())); + } + CasLog.Append(LogEntries); + + m_BlockStore.Split( + ChunkLocations, + LegacyDataPath, + m_BlocksBasePath, + MaxBlockSize, + BlockStoreDiskLocation::MaxBlockIndex + 1, + m_PayloadAlignment, + CleanSource, + [this, &LegacyDiskIndex, &ChunkIndexToChunkHash, &LegacyCasLog, &CasLog, CleanSource, &MigratedBlockCount, &MigratedChunkCount]( + const BlockStore::MovedChunksArray& MovedChunks) { + std::vector<DiskIndexEntry> LogEntries; + LogEntries.reserve(MovedChunks.size()); + for (const auto& Entry : MovedChunks) + { + size_t ChunkIndex = Entry.first; + const BlockStoreLocation& NewLocation = Entry.second; + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + const LegacyDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash]; + const LegacyDiskLocation& OldLocation = OldEntry.Location; + uint8_t Flags = 0xff & (OldLocation.Flags() >> 56); + LogEntries.push_back({.Key = ChunkHash, .Location = DiskLocation(NewLocation, m_PayloadAlignment, Flags)}); + } + for (const DiskIndexEntry& Entry : LogEntries) + { + m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount())); + } + CasLog.Append(LogEntries); + CasLog.Flush(); + if (CleanSource) + { + std::vector<LegacyDiskIndexEntry> LegacyLogEntries; + LegacyLogEntries.reserve(MovedChunks.size()); + for (const auto& Entry : MovedChunks) + { + size_t ChunkIndex = Entry.first; + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + const LegacyDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash]; + const LegacyDiskLocation& OldLocation = OldEntry.Location; + LegacyDiskLocation NewLocation(OldLocation.Offset(), + OldLocation.Size(), + 0, + OldLocation.Flags() | LegacyDiskLocation::kTombStone); + LegacyLogEntries.push_back(LegacyDiskIndexEntry{.Key = ChunkHash, .Location = NewLocation}); + } + LegacyCasLog.Append(LegacyLogEntries); + LegacyCasLog.Flush(); + } + MigratedBlockCount++; + MigratedChunkCount += MovedChunks.size(); + }); + + LegacyCasLog.Close(); + CasLog.Close(); + + if (CleanSource) + { + // Older versions of ZenCacheDiskLayer expects the legacy files to exist if it can find + // a manifest and crashes on startup if they don't. + // In order to not break startup when switching back an older version, lets just reset + // the legacy data files to zero length. + + BasicFile LegacyLog; + LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate); + BasicFile LegacySobs; + LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate); + } + return MigratedChunkCount; +} + +void +ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool IsNew) +{ + m_BucketDir = BucketDir; + + m_TotalSize = 0; + + m_Index.clear(); + + std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_BucketDir); + std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName); + std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); + + if (IsNew) + { + std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_BucketDir); + fs::remove(LegacyLogPath); + fs::remove(LegacyDataPath); + fs::remove(LogPath); + fs::remove(IndexPath); + fs::remove_all(m_BlocksBasePath); + } + + uint64_t LogPosition = ReadIndexFile(); + uint64_t LogEntryCount = ReadLog(LogPosition); + uint64_t LegacyLogEntryCount = MigrateLegacyData(true); + + CreateDirectories(m_BucketDir); + + m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite); + + std::vector<BlockStoreLocation> KnownLocations; + KnownLocations.reserve(m_Index.size()); + for (const auto& Entry : m_Index) + { + const DiskLocation& Location = Entry.second.Location; + m_TotalSize.fetch_add(Location.Size(), std::memory_order::relaxed); + if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) + { + continue; + } + const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_PayloadAlignment); + KnownLocations.push_back(BlockLocation); + } + + m_BlockStore.Initialize(m_BlocksBasePath, MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations); + + if (IsNew || ((LogEntryCount + LegacyLogEntryCount) > 0)) + { + MakeIndexSnapshot(); + } + // TODO: should validate integrity of container files here } void @@ -532,12 +1124,13 @@ ZenCacheDiskLayer::CacheBucket::BuildPath(PathBuilderBase& Path, const IoHash& H bool ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue) { - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + BlockStoreLocation Location = Loc.GetBlockLocation(m_PayloadAlignment); + + OutValue.Value = m_BlockStore.TryGetChunk(Location); + if (!OutValue.Value) { return false; } - - OutValue.Value = IoBufferBuilder::MakeFromFileHandle(m_SobsFile.Handle(), Loc.Offset(), Loc.Size()); OutValue.Value.SetContentType(Loc.GetContentType()); return true; @@ -562,23 +1155,6 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, return false; } -void -ZenCacheDiskLayer::CacheBucket::DeleteStandaloneCacheValue(const DiskLocation& Loc, - const IoHash& HashKey, - const fs::path& Path, - std::error_code& Ec) -{ - ZEN_DEBUG("deleting standalone cache file '{}'", Path); - fs::remove(Path, Ec); - - if (!Ec) - { - m_SlogFile.Append(DiskIndexEntry{.Key = HashKey, .Location = {0, Loc.Size(), 0, DiskLocation::kTombStone}}); - m_Index.erase(HashKey); - m_TotalSize.fetch_sub(Loc.Size(), std::memory_order::relaxed); - } -} - bool ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) { @@ -588,23 +1164,21 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal } RwLock::SharedLockScope _(m_IndexLock); - - if (auto It = m_Index.find(HashKey); It != m_Index.end()) + auto It = m_Index.find(HashKey); + if (It == m_Index.end()) { - IndexEntry& Entry = It.value(); - Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed); - - if (GetInlineCacheValue(Entry.Location, OutValue)) - { - return true; - } - + return false; + } + IndexEntry& Entry = It.value(); + Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed); + DiskLocation Location = Entry.Location; + if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) + { + // We don't need to hold the index lock when we read a standalone file _.ReleaseNow(); - - return GetStandaloneCacheValue(Entry.Location, HashKey, OutValue); + return GetStandaloneCacheValue(Location, HashKey, OutValue); } - - return false; + return GetInlineCacheValue(Location, OutValue); } void @@ -619,54 +1193,13 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& { return PutStandaloneCacheValue(HashKey, Value); } - else - { - // Small object put - - uint64_t EntryFlags = 0; - - if (Value.Value.GetContentType() == ZenContentType::kCbObject) - { - EntryFlags |= DiskLocation::kStructured; - } - else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) - { - EntryFlags |= DiskLocation::kCompressed; - } - - RwLock::ExclusiveLockScope _(m_IndexLock); - - DiskLocation Loc(m_SobsCursor, Value.Value.Size(), 0, EntryFlags); - - m_SobsCursor = RoundUp(m_SobsCursor + Loc.Size(), 16); - - if (auto It = m_Index.find(HashKey); It == m_Index.end()) - { - // Previously unknown object - m_Index.insert({HashKey, {Loc, GcClock::TickCount()}}); - } - else - { - // TODO: should check if write is idempotent and bail out if it is? - // this would requiring comparing contents on disk unless we add a - // content hash to the index entry - IndexEntry& Entry = It.value(); - Entry.Location = Loc; - Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed); - } - - m_SlogFile.Append({.Key = HashKey, .Location = Loc}); - m_SobsFile.Write(Value.Value.Data(), Loc.Size(), Loc.Offset()); - m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed); - } + PutInlineCacheValue(HashKey, Value); } void ZenCacheDiskLayer::CacheBucket::Drop() { - // TODO: add error handling - - m_SobsFile.Close(); + m_BlockStore.Close(); m_SlogFile.Close(); DeleteDirectories(m_BucketDir); } @@ -674,11 +1207,10 @@ ZenCacheDiskLayer::CacheBucket::Drop() void ZenCacheDiskLayer::CacheBucket::Flush() { - RwLock::SharedLockScope _(m_IndexLock); - - m_SobsFile.Flush(); - m_SlogFile.Flush(); + m_BlockStore.Flush(); + RwLock::SharedLockScope _(m_IndexLock); + MakeIndexSnapshot(); SaveManifest(); } @@ -724,20 +1256,22 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) ZenCacheValue Value; - if (GetInlineCacheValue(Loc, Value)) + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { - // Validate contents + if (GetStandaloneCacheValue(Loc, HashKey, Value)) + { + // Note: we cannot currently validate contents since we don't + // have a content hash! + continue; + } } - else if (GetStandaloneCacheValue(Loc, HashKey, Value)) + else if (GetInlineCacheValue(Loc, Value)) { - // Note: we cannot currently validate contents since we don't - // have a content hash! - } - else - { - // Value not found - BadKeys.push_back(HashKey); + // Validate contents + continue; } + // Value not found + BadKeys.push_back(HashKey); } } @@ -754,12 +1288,18 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) { // Log a tombstone and delete the in-memory index for the bad entry - const auto It = m_Index.find(BadKey); - const DiskLocation& Location = It->second.Location; - m_SlogFile.Append(DiskIndexEntry{.Key = BadKey, .Location = {Location.Offset(), Location.Size(), 0, DiskLocation::kTombStone}}); + const auto It = m_Index.find(BadKey); + DiskLocation Location = It->second.Location; + Location.Flags |= DiskLocation::kTombStone; + m_SlogFile.Append(DiskIndexEntry{.Key = BadKey, .Location = Location}); m_Index.erase(BadKey); } } + + // Let whomever it concerns know about the bad chunks. This could + // be used to invalidate higher level data structures more efficiently + // than a full validation pass might be able to do + Ctx.ReportBadCasChunks(BadKeys); } void @@ -767,68 +1307,95 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) { ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::GatherReferences"); - Stopwatch Timer; - const auto Guard = MakeGuard( - [this, &Timer] { ZEN_INFO("gathered references from '{}' in {}", m_BucketDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); + uint64_t WriteBlockTimeUs = 0; + uint64_t WriteBlockLongestTimeUs = 0; + uint64_t ReadBlockTimeUs = 0; + uint64_t ReadBlockLongestTimeUs = 0; + + Stopwatch TotalTimer; + const auto _ = MakeGuard([&] { + ZEN_INFO("gathered references from '{}' in {} write lock: {} ({}), read lock: {} ({})", + m_BucketDir / m_BucketName, + NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), + NiceLatencyNs(WriteBlockTimeUs), + NiceLatencyNs(WriteBlockLongestTimeUs), + NiceLatencyNs(ReadBlockTimeUs), + NiceLatencyNs(ReadBlockLongestTimeUs)); + }); const GcClock::TimePoint ExpireTime = GcCtx.MaxCacheDuration() == GcClock::Duration::max() ? GcClock::TimePoint::min() : GcCtx.Time() - GcCtx.MaxCacheDuration(); const GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); - RwLock::SharedLockScope _(m_IndexLock); - - std::vector<IoHash> ValidKeys; - std::vector<IoHash> ExpiredKeys; - std::vector<IoHash> Cids; - std::vector<IndexMap::value_type> Entries(m_Index.begin(), m_Index.end()); - - std::sort(Entries.begin(), Entries.end(), [](const auto& LHS, const auto& RHS) { - return LHS.second.LastAccess < RHS.second.LastAccess; - }); + IndexMap Index; + { + RwLock::SharedLockScope __(m_IndexLock); + Stopwatch Timer; + const auto ___ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + Index = m_Index; + } - const auto ValidIt = std::lower_bound(Entries.begin(), Entries.end(), ExpireTicks, [](const auto& Kv, auto Ticks) { - const IndexEntry& Entry = Kv.second; - return Entry.LastAccess < Ticks; - }); + std::vector<IoHash> ExpiredKeys; + ExpiredKeys.reserve(1024); + std::vector<IoHash> Cids; Cids.reserve(1024); - for (auto Kv = ValidIt; Kv != Entries.end(); ++Kv) + for (const auto& Entry : Index) { - const IoHash& Key = Kv->first; - const DiskLocation& Loc = Kv->second.Location; + const IoHash& Key = Entry.first; + if (Entry.second.LastAccess < ExpireTicks) + { + ExpiredKeys.push_back(Key); + continue; + } + + const DiskLocation& Loc = Entry.second.Location; if (Loc.IsFlagSet(DiskLocation::kStructured)) { - ZenCacheValue CacheValue; - if (!GetInlineCacheValue(Loc, CacheValue)) + if (Cids.size() > 1024) { - GetStandaloneCacheValue(Loc, Key, CacheValue); + GcCtx.ContributeCids(Cids); + Cids.clear(); } - if (CacheValue.Value) + ZenCacheValue CacheValue; { - ZEN_ASSERT(CacheValue.Value.GetContentType() == ZenContentType::kCbObject); - if (Cids.size() > 1024) + RwLock::SharedLockScope __(m_IndexLock); + Stopwatch Timer; + const auto ___ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { - GcCtx.ContributeCids(Cids); - Cids.clear(); + if (!GetStandaloneCacheValue(Loc, Key, CacheValue)) + { + continue; + } + } + else if (!GetInlineCacheValue(Loc, CacheValue)) + { + continue; } - CbObject Obj(SharedBuffer{CacheValue.Value}); - Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); }); } + + ZEN_ASSERT(CacheValue.Value); + ZEN_ASSERT(CacheValue.Value.GetContentType() == ZenContentType::kCbObject); + CbObject Obj(SharedBuffer{CacheValue.Value}); + Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); }); } } - ValidKeys.reserve(std::distance(ValidIt, Entries.end())); - ExpiredKeys.reserve(std::distance(Entries.begin(), ValidIt)); - - std::transform(ValidIt, Entries.end(), std::back_inserter(ValidKeys), [](const auto& Kv) { return Kv.first; }); - std::transform(Entries.begin(), ValidIt, std::back_inserter(ExpiredKeys), [](const auto& Kv) { return Kv.first; }); - GcCtx.ContributeCids(Cids); - GcCtx.ContributeCacheKeys(m_BucketName, std::move(ValidKeys), std::move(ExpiredKeys)); + GcCtx.ContributeCacheKeys(m_BucketName, std::move(ExpiredKeys)); } void @@ -836,203 +1403,282 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) { ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::CollectGarbage"); - Flush(); - - RwLock::ExclusiveLockScope _(m_IndexLock); - - const uint64_t OldCount = m_Index.size(); - const uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed); - - ZEN_INFO("collecting garbage from z$ bucket '{}'", m_BucketDir); - - Stopwatch Timer; - const auto Guard = MakeGuard([this, &Timer, &OldCount, &OldTotalSize] { - const uint64_t NewCount = m_Index.size(); - const uint64_t NewTotalSize = m_TotalSize.load(std::memory_order::relaxed); - ZEN_INFO("garbage collect from '{}' DONE after {}, collected {} ({}) chunks of total {} ({})", - m_BucketDir, - NiceTimeSpanMs(Timer.GetElapsedTimeMs()), - OldCount - NewCount, - NiceBytes(OldTotalSize - NewTotalSize), - OldCount, - NiceBytes(OldTotalSize)); + ZEN_INFO("collecting garbage from '{}'", m_BucketDir / m_BucketName); + + Stopwatch TotalTimer; + uint64_t WriteBlockTimeUs = 0; + uint64_t WriteBlockLongestTimeUs = 0; + uint64_t ReadBlockTimeUs = 0; + uint64_t ReadBlockLongestTimeUs = 0; + uint64_t TotalChunkCount = 0; + uint64_t DeletedSize = 0; + uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed); + + uint64_t DeletedCount = 0; + uint64_t MovedCount = 0; + + const auto _ = MakeGuard([&] { + ZEN_INFO( + "garbage collect from '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted #{} and moved " + "#{} " + "of #{} " + "entires ({}).", + m_BucketDir / m_BucketName, + NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), + NiceLatencyNs(WriteBlockTimeUs), + NiceLatencyNs(WriteBlockLongestTimeUs), + NiceLatencyNs(ReadBlockTimeUs), + NiceLatencyNs(ReadBlockLongestTimeUs), + NiceBytes(DeletedSize), + DeletedCount, + MovedCount, + TotalChunkCount, + NiceBytes(OldTotalSize)); + RwLock::SharedLockScope _(m_IndexLock); SaveManifest(); }); - if (m_Index.empty()) + m_SlogFile.Flush(); + + std::span<const IoHash> ExpiredCacheKeys = GcCtx.ExpiredCacheKeys(m_BucketName); + std::vector<IoHash> DeleteCacheKeys; + DeleteCacheKeys.reserve(ExpiredCacheKeys.size()); + GcCtx.FilterCas(ExpiredCacheKeys, [&](const IoHash& ChunkHash, bool Keep) { + if (Keep) + { + return; + } + DeleteCacheKeys.push_back(ChunkHash); + }); + if (DeleteCacheKeys.empty()) { + ZEN_INFO("garbage collect SKIPPED, for '{}', no expired cache keys found", m_BucketDir / m_BucketName); return; } - auto AddEntries = [this](std::span<const IoHash> Keys, std::vector<IndexMap::value_type>& OutEntries) { - for (const IoHash& Key : Keys) + std::vector<DiskIndexEntry> ExpiredStandaloneEntries; + IndexMap Index; + BlockStore::ReclaimSnapshotState BlockStoreState; + { + RwLock::SharedLockScope __(m_IndexLock); + Stopwatch Timer; + const auto ____ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + if (m_Index.empty()) { - if (auto It = m_Index.find(Key); It != m_Index.end()) - { - OutEntries.push_back(*It); - } + ZEN_INFO("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir / m_BucketName); + return; } - }; + BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); - std::vector<IndexMap::value_type> ValidEntries; - std::vector<IndexMap::value_type> ExpiredEntries; - - AddEntries(GcCtx.ValidCacheKeys(m_BucketName), ValidEntries); - AddEntries(GcCtx.ExpiredCacheKeys(m_BucketName), ExpiredEntries); - - // Remove all standalone file(s) - // NOTE: This can probably be made asynchronously - { - std::error_code Ec; - ExtendablePathBuilder<256> Path; + SaveManifest(); + Index = m_Index; - for (const auto& Entry : ExpiredEntries) + for (const IoHash& Key : DeleteCacheKeys) { - const IoHash& Key = Entry.first; - const DiskLocation& Loc = Entry.second.Location; - - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + if (auto It = Index.find(Key); It != Index.end()) { - Path.Reset(); - BuildPath(Path, Key); - - // NOTE: this will update index and log file - DeleteStandaloneCacheValue(Loc, Key, Path.c_str(), Ec); - - if (Ec) + DiskIndexEntry Entry = {.Key = It->first, .Location = It->second.Location}; + if (Entry.Location.Flags & DiskLocation::kStandaloneFile) { - ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason '{}'", Path.ToUtf8(), Ec.message()); - Ec.clear(); + Entry.Location.Flags |= DiskLocation::kTombStone; + ExpiredStandaloneEntries.push_back(Entry); } } } + if (GcCtx.IsDeletionMode()) + { + for (const auto& Entry : ExpiredStandaloneEntries) + { + m_Index.erase(Entry.Key); + } + m_SlogFile.Append(ExpiredStandaloneEntries); + } } - if (GcCtx.CollectSmallObjects() && !ExpiredEntries.empty()) + if (GcCtx.IsDeletionMode()) { - // Naive GC implementation of small objects. Needs enough free - // disk space to store intermediate sob container along side the - // old container - - const auto ResetSobStorage = [this, &ValidEntries]() { - m_SobsFile.Close(); - m_SlogFile.Close(); + std::error_code Ec; + ExtendablePathBuilder<256> Path; - const bool IsNew = true; - m_SobsFile.Open(m_BucketDir / "zen.sobs", IsNew ? BasicFile::Mode::kTruncate : BasicFile::Mode::kWrite); - m_SlogFile.Open(m_BucketDir / "zen.slog", IsNew ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite); + for (const auto& Entry : ExpiredStandaloneEntries) + { + const IoHash& Key = Entry.Key; + const DiskLocation& Loc = Entry.Location; - m_SobsCursor = 0; - m_TotalSize = 0; - m_Index.clear(); + Path.Reset(); + BuildPath(Path, Key); + fs::path FilePath = Path.ToPath(); - for (const auto& Entry : ValidEntries) { - const IoHash& Key = Entry.first; - const DiskLocation& Loc = Entry.second.Location; + RwLock::SharedLockScope __(m_IndexLock); + Stopwatch Timer; + const auto ____ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + if (m_Index.contains(Key)) + { + // Someone added it back, let the file on disk be + ZEN_DEBUG("skipping z$ delete standalone of file '{}' FAILED, it has been added back", Path.ToUtf8()); + continue; + } + __.ReleaseNow(); - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + RwLock::ExclusiveLockScope ValueLock(LockForHash(Key)); + if (fs::is_regular_file(FilePath)) { - m_SlogFile.Append({.Key = Key, .Location = Loc}); - m_Index.insert({Key, {Loc, GcClock::TickCount()}}); - m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed); + ZEN_DEBUG("deleting standalone cache file '{}'", Path.ToUtf8()); + fs::remove(FilePath, Ec); } } - }; - uint64_t NewContainerSize{}; - for (const auto& Entry : ValidEntries) - { - const DiskLocation& Loc = Entry.second.Location; - - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile) == false) + if (Ec) { - NewContainerSize += (Loc.Size() + sizeof(DiskLocation)); + ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason: '{}'", Path.ToUtf8(), Ec.message()); + Ec.clear(); + DiskLocation RestoreLocation = Loc; + RestoreLocation.Flags &= ~DiskLocation::kTombStone; + + RwLock::ExclusiveLockScope __(m_IndexLock); + Stopwatch Timer; + const auto ___ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + if (m_Index.contains(Key)) + { + continue; + } + m_SlogFile.Append(DiskIndexEntry{.Key = Key, .Location = RestoreLocation}); + m_Index.insert({Key, {Loc, GcClock::TickCount()}}); + m_TotalSize.fetch_add(Entry.Location.Size(), std::memory_order::relaxed); + continue; } + m_TotalSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); + DeletedSize += Entry.Location.Size(); + DeletedCount++; } + } - if (NewContainerSize == 0) - { - ResetSobStorage(); - return; - } + TotalChunkCount = Index.size(); - const uint64_t DiskSpaceMargin = (256 << 10); + std::vector<IoHash> TotalChunkHashes; + TotalChunkHashes.reserve(TotalChunkCount); + for (const auto& Entry : Index) + { + const DiskLocation& Location = Entry.second.Location; - std::error_code Ec; - DiskSpace Space = DiskSpaceInfo(m_BucketDir, Ec); - if (Ec || Space.Free < NewContainerSize + DiskSpaceMargin) + if (Location.Flags & DiskLocation::kStandaloneFile) { - ZEN_WARN("garbage collect z$ bucket '{}' FAILED, not enough disk space {}/{} (required/free)", - m_BucketDir, - NiceBytes(NewContainerSize), - NiceBytes(Space.Free)); - return; + continue; } + TotalChunkHashes.push_back(Entry.first); + } - std::filesystem::path TmpSobsPath{m_BucketDir / "zen.sobs.tmp"}; - std::filesystem::path TmpSlogPath{m_BucketDir / "zen.slog.tmp"}; - - // Copy non expired sob(s) to temporary sob container - + if (TotalChunkHashes.empty()) + { + return; + } + TotalChunkCount = TotalChunkHashes.size(); + + std::vector<BlockStoreLocation> ChunkLocations; + BlockStore::ChunkIndexArray KeepChunkIndexes; + std::vector<IoHash> ChunkIndexToChunkHash; + ChunkLocations.reserve(TotalChunkCount); + ChunkLocations.reserve(TotalChunkCount); + ChunkIndexToChunkHash.reserve(TotalChunkCount); + + GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) { + auto KeyIt = Index.find(ChunkHash); + const DiskLocation& DiskLocation = KeyIt->second.Location; + BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_PayloadAlignment); + size_t ChunkIndex = ChunkLocations.size(); + ChunkLocations.push_back(Location); + ChunkIndexToChunkHash[ChunkIndex] = ChunkHash; + if (Keep) { - BasicFile TmpSobs; - TCasLogFile<DiskIndexEntry> TmpLog; - uint64_t TmpCursor{}; - std::vector<uint8_t> Chunk; + KeepChunkIndexes.push_back(ChunkIndex); + } + }); - TmpSobs.Open(TmpSobsPath, BasicFile::Mode::kTruncate); - TmpLog.Open(TmpSlogPath, CasLogFile::Mode::kTruncate); + size_t DeleteCount = TotalChunkCount - KeepChunkIndexes.size(); - for (const auto& Entry : ValidEntries) - { - const IoHash& Key = Entry.first; - const DiskLocation& Loc = Entry.second.Location; + const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); + if (!PerformDelete) + { + m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true); + uint64_t TotalSize = m_TotalSize.load(std::memory_order_relaxed); + ZEN_INFO("garbage collect from '{}' DISABLED, found #{} {} chunks of total #{} {}", + m_BucketDir / m_BucketName, + DeleteCount, + 0, // NiceBytes(TotalSize - NewTotalSize), + TotalChunkCount, + NiceBytes(TotalSize)); + return; + } - DiskLocation NewLoc; + std::vector<IoHash> DeletedChunks; + m_BlockStore.ReclaimSpace( + BlockStoreState, + ChunkLocations, + KeepChunkIndexes, + m_PayloadAlignment, + false, + [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) { + std::vector<DiskIndexEntry> LogEntries; + LogEntries.reserve(MovedChunks.size() + RemovedChunks.size()); + for (const auto& Entry : MovedChunks) + { + size_t ChunkIndex = Entry.first; + const BlockStoreLocation& NewLocation = Entry.second; + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + const DiskLocation& OldDiskLocation = Index[ChunkHash].Location; + LogEntries.push_back( + {.Key = ChunkHash, .Location = DiskLocation(NewLocation, m_PayloadAlignment, OldDiskLocation.GetFlags())}); + } + for (const size_t ChunkIndex : RemovedChunks) + { + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + const DiskLocation& OldDiskLocation = Index[ChunkHash].Location; + LogEntries.push_back({.Key = ChunkHash, + .Location = DiskLocation(OldDiskLocation.GetBlockLocation(m_PayloadAlignment), + m_PayloadAlignment, + OldDiskLocation.GetFlags() | DiskLocation::kTombStone)}); + DeletedChunks.push_back(ChunkHash); + } - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) - { - NewLoc = DiskLocation(0, Loc.Size(), 0, Loc.GetFlags()); - } - else + m_SlogFile.Append(LogEntries); + m_SlogFile.Flush(); + { + RwLock::ExclusiveLockScope __(m_IndexLock); + Stopwatch Timer; + const auto ____ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + for (const DiskIndexEntry& Entry : LogEntries) { - Chunk.resize(Loc.Size()); - m_SobsFile.Read(Chunk.data(), Chunk.size(), Loc.Offset()); - - NewLoc = DiskLocation(TmpCursor, Chunk.size(), 0, Loc.GetFlags()); - TmpSobs.Write(Chunk.data(), Chunk.size(), TmpCursor); - TmpCursor = RoundUp(TmpCursor + Chunk.size(), 16); + if (Entry.Location.GetFlags() & DiskLocation::kTombStone) + { + m_Index.erase(Entry.Key); + uint64_t ChunkSize = Entry.Location.GetBlockLocation(m_PayloadAlignment).Size; + m_TotalSize.fetch_sub(ChunkSize); + continue; + } + m_Index[Entry.Key].Location = Entry.Location; } - - TmpLog.Append(DiskIndexEntry{.Key = Key, .Location = NewLoc}); } - } - - // Swap state - try - { - fs::path SobsPath{m_BucketDir / "zen.sobs"}; - fs::path SlogPath{m_BucketDir / "zen.slog"}; - - m_SobsFile.Close(); - m_SlogFile.Close(); - - fs::remove(SobsPath); - fs::remove(SlogPath); - - fs::rename(TmpSobsPath, SobsPath); - fs::rename(TmpSlogPath, SlogPath); + }, + [&]() { return GcCtx.CollectSmallObjects(); }); - const bool IsNew = false; - OpenLog(m_BucketDir, IsNew); - } - catch (std::exception& Err) - { - ZEN_ERROR("garbage collection FAILED, reason '{}'", Err.what()); - ResetSobStorage(); - } - } + GcCtx.DeletedCas(DeletedChunks); } void @@ -1079,96 +1725,187 @@ ZenCacheDiskLayer::UpdateAccessTimes(const zen::access_tracking::AccessTimes& Ac void ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value) { - RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey)); - - ExtendablePathBuilder<256> DataFilePath; - BuildPath(DataFilePath, HashKey); + uint64_t NewFileSize = Value.Value.Size(); TemporaryFile DataFile; std::error_code Ec; DataFile.CreateTemporary(m_BucketDir.c_str(), Ec); - if (Ec) { - throw std::system_error(Ec, fmt::format("Failed to open temporary file for put at '{}'", m_BucketDir)); - } + throw std::system_error(Ec, fmt::format("Failed to open temporary file for put in '{}'", m_BucketDir)); + } + + bool CleanUpTempFile = false; + auto __ = MakeGuard([&] { + if (CleanUpTempFile) + { + std::error_code Ec; + std::filesystem::remove(DataFile.GetPath(), Ec); + if (Ec) + { + ZEN_WARN("Failed to clean up temporary file '{}' for put in '{}', reason '{}'", + DataFile.GetPath(), + m_BucketDir, + Ec.message()); + } + } + }); DataFile.WriteAll(Value.Value, Ec); - if (Ec) { - throw std::system_error(Ec, fmt::format("Failed to write payload ({} bytes) to file", NiceBytes(Value.Value.Size()))); + throw std::system_error(Ec, + fmt::format("Failed to write payload ({} bytes) to temporary file '{}' for put in '{}'", + NiceBytes(NewFileSize), + DataFile.GetPath().string(), + m_BucketDir)); } - // Move file into place (atomically) - + ExtendablePathBuilder<256> DataFilePath; + BuildPath(DataFilePath, HashKey); std::filesystem::path FsPath{DataFilePath.ToPath()}; - DataFile.MoveTemporaryIntoPlace(FsPath, Ec); - - if (Ec) + // We retry to move the file since it can be held open for read. + // This happens if the server processes a Get request for the file or + // if we are busy sending the file upstream + int RetryCount = 4; + do { - int RetryCount = 3; - - do + Ec.clear(); { - std::filesystem::path ParentPath = FsPath.parent_path(); - CreateDirectories(ParentPath); + RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey)); DataFile.MoveTemporaryIntoPlace(FsPath, Ec); - if (!Ec) + // Once we have called MoveTemporaryIntoPlace automatic clean up the temp file + // will be disabled as the file handle has already been closed + CleanUpTempFile = Ec ? true : false; + + if (Ec) { - break; + std::error_code ExistingEc; + uint64_t OldFileSize = std::filesystem::file_size(FsPath, ExistingEc); + if (!ExistingEc && (OldFileSize == NewFileSize)) + { + ZEN_INFO( + "Failed to move temporary file '{}' to '{}' for '{}'. Target file has same size, assuming concurrent write of same " + "value, " + "move " + "failed with reason '{}'", + DataFile.GetPath(), + FsPath.string(), + m_BucketDir, + Ec.message()); + return; + } } + } - std::error_code InnerEc; - const uint64_t ExistingFileSize = std::filesystem::file_size(FsPath, InnerEc); + if (!Ec) + { + uint8_t EntryFlags = DiskLocation::kStandaloneFile; - if (!InnerEc && ExistingFileSize == Value.Value.Size()) + if (Value.Value.GetContentType() == ZenContentType::kCbObject) + { + EntryFlags |= DiskLocation::kStructured; + } + else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) { - // Concurrent write of same value? - return; + EntryFlags |= DiskLocation::kCompressed; } - // Semi arbitrary back-off - zen::Sleep(1000 * RetryCount); - } while (RetryCount--); + DiskLocation Loc(NewFileSize, EntryFlags); + IndexEntry Entry = IndexEntry(Loc, GcClock::TickCount()); - if (Ec) - { - throw std::system_error(Ec, fmt::format("Failed to finalize file '{}'", DataFilePath.ToUtf8())); + uint64_t OldFileSize = 0; + RwLock::ExclusiveLockScope _(m_IndexLock); + if (auto It = m_Index.find(HashKey); It == m_Index.end()) + { + // Previously unknown object + m_Index.insert({HashKey, Entry}); + } + else + { + // TODO: should check if write is idempotent and bail out if it is? + OldFileSize = It.value().Location.Size(); + It.value() = Entry; + } + + m_SlogFile.Append({.Key = HashKey, .Location = Loc}); + if (OldFileSize <= NewFileSize) + { + m_TotalSize.fetch_add(NewFileSize - OldFileSize, std::memory_order::relaxed); + } + else + { + m_TotalSize.fetch_sub(OldFileSize - NewFileSize, std::memory_order::relaxed); + } + return; } - } - // Update index + std::filesystem::path ParentPath = FsPath.parent_path(); + if (!std::filesystem::is_directory(ParentPath)) + { + Ec.clear(); + std::filesystem::create_directories(ParentPath, Ec); + if (!Ec) + { + // Retry without sleep + continue; + } + throw std::system_error( + Ec, + fmt::format("Failed to create parent directory '{}' for file '{}' for put in '{}'", ParentPath, FsPath, m_BucketDir)); + } - uint64_t EntryFlags = DiskLocation::kStandaloneFile; + ZEN_INFO("Failed renaming temporary file '{}' to '{}' for put in '{}', pausing and retrying, reason '{}'", + DataFile.GetPath().string(), + FsPath.string(), + m_BucketDir, + Ec.message()); - if (Value.Value.GetContentType() == ZenContentType::kCbObject) - { - EntryFlags |= DiskLocation::kStructured; - } + // Semi arbitrary back-off + zen::Sleep(200 * (5 - RetryCount)); // Sleep at most for a total of 3 seconds + } while (RetryCount-- > 0); - RwLock::ExclusiveLockScope _(m_IndexLock); + throw std::system_error(Ec, fmt::format("Failed to finalize file '{}' for put in '{}'", DataFilePath.ToUtf8(), m_BucketDir)); +} - DiskLocation Loc(/* Offset */ 0, Value.Value.Size(), 0, EntryFlags); - IndexEntry Entry = IndexEntry(Loc, GcClock::TickCount()); +void +ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value) +{ + uint8_t EntryFlags = 0; - if (auto It = m_Index.find(HashKey); It == m_Index.end()) + if (Value.Value.GetContentType() == ZenContentType::kCbObject) { - // Previously unknown object - m_Index.insert({HashKey, Entry}); + EntryFlags |= DiskLocation::kStructured; } - else + else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) { - // TODO: should check if write is idempotent and bail out if it is? - It.value() = Entry; + EntryFlags |= DiskLocation::kCompressed; } - m_SlogFile.Append({.Key = HashKey, .Location = Loc}); - m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed); + m_BlockStore.WriteChunk(Value.Value.Data(), Value.Value.Size(), m_PayloadAlignment, [&](const BlockStoreLocation& BlockStoreLocation) { + DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags); + const DiskIndexEntry DiskIndexEntry{.Key = HashKey, .Location = Location}; + m_SlogFile.Append(DiskIndexEntry); + RwLock::ExclusiveLockScope _(m_IndexLock); + if (auto It = m_Index.find(HashKey); It != m_Index.end()) + { + // TODO: should check if write is idempotent and bail out if it is? + // this would requiring comparing contents on disk unless we add a + // content hash to the index entry + IndexEntry& Entry = It.value(); + Entry.Location = Location; + Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed); + } + else + { + m_Index.insert({HashKey, {Location, GcClock::TickCount()}}); + } + }); + m_TotalSize.fetch_add(Value.Value.Size(), std::memory_order::relaxed); } ////////////////////////////////////////////////////////////////////////// @@ -1255,10 +1992,10 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z auto It = m_Buckets.try_emplace(BucketName, BucketName); Bucket = &It.first->second; - std::filesystem::path bucketPath = m_RootDir; - bucketPath /= BucketName; + std::filesystem::path BucketPath = m_RootDir; + BucketPath /= BucketName; - Bucket->OpenOrCreate(bucketPath); + Bucket->OpenOrCreate(BucketPath); } } @@ -1273,49 +2010,23 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z void ZenCacheDiskLayer::DiscoverBuckets() { - FileSystemTraversal Traversal; - struct Visitor : public FileSystemTraversal::TreeVisitor - { - virtual void VisitFile([[maybe_unused]] const std::filesystem::path& Parent, - [[maybe_unused]] const path_view& File, - [[maybe_unused]] uint64_t FileSize) override - { - } - - virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent, const path_view& DirectoryName) override - { - Dirs.push_back((decltype(Dirs)::value_type)(DirectoryName)); - return false; - } - - std::vector<std::filesystem::path::string_type> Dirs; - } Visit; - - Traversal.TraverseFileSystem(m_RootDir, Visit); + DirectoryContent DirContent; + GetDirectoryContent(m_RootDir, DirectoryContent::IncludeDirsFlag, DirContent); // Initialize buckets RwLock::ExclusiveLockScope _(m_Lock); - for (const auto& BucketName : Visit.Dirs) + for (const std::filesystem::path& BucketPath : DirContent.Directories) { + std::string BucketName = PathToUtf8(BucketPath.stem()); // New bucket needs to be created - -#if ZEN_PLATFORM_WINDOWS - std::string BucketName8 = WideToUtf8(BucketName); -#else - const auto& BucketName8 = BucketName; -#endif - - if (auto It = m_Buckets.find(BucketName8); It != m_Buckets.end()) + if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) { } else { - auto InsertResult = m_Buckets.try_emplace(BucketName8, BucketName8); - - std::filesystem::path BucketPath = m_RootDir; - BucketPath /= BucketName8; + auto InsertResult = m_Buckets.try_emplace(BucketName, BucketName); CacheBucket& Bucket = InsertResult.first->second; @@ -1323,11 +2034,11 @@ ZenCacheDiskLayer::DiscoverBuckets() if (Bucket.IsOk()) { - ZEN_INFO("Discovered bucket '{}'", BucketName8); + ZEN_INFO("Discovered bucket '{}'", BucketName); } else { - ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName8, m_RootDir); + ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); m_Buckets.erase(InsertResult.first); } @@ -1363,11 +2074,10 @@ void ZenCacheDiskLayer::Flush() { std::vector<CacheBucket*> Buckets; - Buckets.reserve(m_Buckets.size()); { RwLock::SharedLockScope _(m_Lock); - + Buckets.reserve(m_Buckets.size()); for (auto& Kv : m_Buckets) { Buckets.push_back(&Kv.second); @@ -1416,6 +2126,176 @@ ZenCacheDiskLayer::TotalSize() const return TotalSize; } +//////////////////////////// ZenCacheStore + +static constexpr std::string_view UE4DDCNamespaceName = "ue4.ddc"; + +ZenCacheStore::ZenCacheStore(CasGc& Gc, std::filesystem::path BasePath) : GcStorage(Gc), GcContributor(Gc) +{ + CreateDirectories(BasePath); + + DirectoryContent DirContent; + GetDirectoryContent(BasePath, DirectoryContent::IncludeDirsFlag, DirContent); + + std::vector<std::string> LegacyBuckets; + std::vector<std::string> Namespaces; + for (const std::filesystem::path& DirPath : DirContent.Directories) + { + std::string DirName = PathToUtf8(DirPath.stem()); + if (DirName.starts_with(NamespaceDiskPrefix)) + { + Namespaces.push_back(DirName.substr(NamespaceDiskPrefix.length())); + continue; + } + LegacyBuckets.push_back(DirName); + } + + ZEN_INFO("Found #{} namespaces in '{}' and #{} legacy buckets", Namespaces.size(), BasePath, LegacyBuckets.size()); + + if (std::find(Namespaces.begin(), Namespaces.end(), UE4DDCNamespaceName) == Namespaces.end()) + { + // default (unspecified) and ue4-ddc namespace points to the same namespace instance + + ZEN_INFO("Moving #{} legacy buckets to '{}' namespace", LegacyBuckets.size(), UE4DDCNamespaceName); + + std::filesystem::path DefaultNamespaceFolder = BasePath / fmt::format("{}{}", NamespaceDiskPrefix, UE4DDCNamespaceName); + CreateDirectories(DefaultNamespaceFolder); + + // Move any non-namespace folders into the default namespace folder + for (const std::string& DirName : LegacyBuckets) + { + std::filesystem::path LegacyFolder = BasePath / DirName; + std::filesystem::path NewPath = DefaultNamespaceFolder / DirName; + std::error_code Ec; + std::filesystem::rename(LegacyFolder, NewPath, Ec); + if (Ec) + { + ZEN_ERROR("Unable to move '{}' to '{}', reason '{}'", LegacyFolder, NewPath, Ec.message()); + } + } + Namespaces.push_back(std::string(UE4DDCNamespaceName)); + } + + for (const std::string& NamespaceName : Namespaces) + { + m_Namespaces[NamespaceName] = + std::make_unique<ZenCacheNamespace>(Gc, BasePath / fmt::format("{}{}", NamespaceDiskPrefix, NamespaceName)); + } +} + +ZenCacheStore::~ZenCacheStore() +{ + m_Namespaces.clear(); +} + +bool +ZenCacheStore::Get(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue) +{ + if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store) + { + return Store->Get(Bucket, HashKey, OutValue); + } + ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Get, bucket '{}', key '{}'", Namespace, Bucket, HashKey.ToHexString()); + return false; +} + +void +ZenCacheStore::Put(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value) +{ + if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store) + { + return Store->Put(Bucket, HashKey, Value); + } + ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Put, bucket '{}', key '{}'", Namespace, Bucket, HashKey.ToHexString()); +} + +bool +ZenCacheStore::DropBucket(std::string_view Namespace, std::string_view Bucket) +{ + if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store) + { + return Store->DropBucket(Bucket); + } + ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Put, bucket '{}'", Namespace, Bucket); + return false; +} + +void +ZenCacheStore::Flush() +{ + IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.Flush(); }); +} + +void +ZenCacheStore::Scrub(ScrubContext& Ctx) +{ + IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.Scrub(Ctx); }); +} + +ZenCacheNamespace* +ZenCacheStore::GetNamespace(std::string_view Namespace) +{ + RwLock::SharedLockScope _(m_NamespacesLock); + if (auto It = m_Namespaces.find(std::string(Namespace)); It != m_Namespaces.end()) + { + return It->second.get(); + } + if (Namespace == DefaultNamespace) + { + if (auto It = m_Namespaces.find(std::string(UE4DDCNamespaceName)); It != m_Namespaces.end()) + { + return It->second.get(); + } + } + return nullptr; +} + +void +ZenCacheStore::IterateNamespaces(const std::function<void(std::string_view Namespace, ZenCacheNamespace& Store)>& Callback) const +{ + std::vector<std::pair<std::string, ZenCacheNamespace&> > Namespaces; + { + RwLock::SharedLockScope _(m_NamespacesLock); + Namespaces.reserve(m_Namespaces.size()); + for (const auto& Entry : m_Namespaces) + { + if (Entry.first == DefaultNamespace) + { + continue; + } + Namespaces.push_back({Entry.first, *Entry.second}); + } + } + for (auto& Entry : Namespaces) + { + Callback(Entry.first, Entry.second); + } +} + +void +ZenCacheStore::GatherReferences(GcContext& GcCtx) +{ + IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.GatherReferences(GcCtx); }); +} + +void +ZenCacheStore::CollectGarbage(GcContext& GcCtx) +{ + IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.CollectGarbage(GcCtx); }); +} + +GcStorageSize +ZenCacheStore::StorageSize() const +{ + GcStorageSize Size; + IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { + GcStorageSize StoreSize = Store.StorageSize(); + Size.MemorySize += StoreSize.MemorySize; + Size.DiskSize += StoreSize.DiskSize; + }); + return Size; +} + ////////////////////////////////////////////////////////////////////////// #if ZEN_WITH_TESTS @@ -1427,10 +2307,18 @@ namespace testutils { IoBuffer CreateBinaryCacheValue(uint64_t Size) { - std::vector<uint32_t> Data(size_t(Size / sizeof(uint32_t))); - std::generate(Data.begin(), Data.end(), [Idx = 0]() mutable { return Idx++; }); + static std::random_device rd; + static std::mt19937 g(rd()); + + std::vector<uint8_t> Values; + Values.resize(Size); + for (size_t Idx = 0; Idx < Size; ++Idx) + { + Values[Idx] = static_cast<uint8_t>(Idx); + } + std::shuffle(Values.begin(), Values.end(), g); - IoBuffer Buf(IoBuffer::Clone, Data.data(), Data.size() * sizeof(uint32_t)); + IoBuffer Buf(IoBuffer::Clone, Values.data(), Values.size()); Buf.SetContentType(ZenContentType::kBinary); return Buf; }; @@ -1443,7 +2331,7 @@ TEST_CASE("z$.store") CasGc Gc; - ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); + ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); const int kIterationCount = 100; @@ -1496,8 +2384,8 @@ TEST_CASE("z$.size") GcStorageSize CacheSize; { - CasGc Gc; - ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); + CasGc Gc; + ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); CbObject CacheValue = CreateCacheValue(Zcs.DiskLayerThreshold() - 256); @@ -1516,8 +2404,8 @@ TEST_CASE("z$.size") } { - CasGc Gc; - ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); + CasGc Gc; + ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); const GcStorageSize SerializedSize = Zcs.StorageSize(); CHECK_EQ(SerializedSize.MemorySize, 0); @@ -1539,8 +2427,8 @@ TEST_CASE("z$.size") GcStorageSize CacheSize; { - CasGc Gc; - ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); + CasGc Gc; + ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); CbObject CacheValue = CreateCacheValue(Zcs.DiskLayerThreshold() + 64); @@ -1559,8 +2447,8 @@ TEST_CASE("z$.size") } { - CasGc Gc; - ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); + CasGc Gc; + ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); const GcStorageSize SerializedSize = Zcs.StorageSize(); CHECK_EQ(SerializedSize.MemorySize, 0); @@ -1597,9 +2485,9 @@ TEST_CASE("z$.gc") }; { - CasGc Gc; - ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); - const auto Bucket = "teardrinker"sv; + CasGc Gc; + ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); + const auto Bucket = "teardrinker"sv; // Create a cache record const IoHash Key = CreateKey(42); @@ -1635,7 +2523,7 @@ TEST_CASE("z$.gc") // Expect timestamps to be serialized { CasGc Gc; - ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); + ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); std::vector<IoHash> Keep; // Collect garbage with 1 hour max cache duration @@ -1656,7 +2544,7 @@ TEST_CASE("z$.gc") { ScopedTemporaryDirectory TempDir; CasGc Gc; - ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); + ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); const auto Bucket = "fortysixandtwo"sv; const GcClock::TimePoint CurrentTime = GcClock::Now(); @@ -1704,7 +2592,7 @@ TEST_CASE("z$.gc") { ScopedTemporaryDirectory TempDir; CasGc Gc; - ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); + ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); const auto Bucket = "rightintwo"sv; const GcClock::TimePoint CurrentTime = GcClock::Now(); @@ -1737,6 +2625,7 @@ TEST_CASE("z$.gc") GcCtx.MaxCacheDuration(std::chrono::minutes(2)); GcCtx.CollectSmallObjects(true); + Zcs.Flush(); Gc.CollectGarbage(GcCtx); for (const auto& Key : Keys) @@ -1751,6 +2640,502 @@ TEST_CASE("z$.gc") } } +TEST_CASE("z$.legacyconversion") +{ + ScopedTemporaryDirectory TempDir; + + uint64_t ChunkSizes[] = {2041, + 1123, + 1223, + 1239, + 341, + 1412, + 912, + 774, + 341, + 431, + 554, + 1098, + 2048, + 339 + 64 * 1024, + 561 + 64 * 1024, + 16 + 64 * 1024, + 16 + 64 * 1024, + 2048, + 2048}; + size_t ChunkCount = sizeof(ChunkSizes) / sizeof(uint64_t); + size_t SingleBlockSize = 0; + std::vector<IoBuffer> Chunks; + Chunks.reserve(ChunkCount); + for (uint64_t Size : ChunkSizes) + { + Chunks.push_back(testutils::CreateBinaryCacheValue(Size)); + SingleBlockSize += Size; + } + + std::vector<IoHash> ChunkHashes; + ChunkHashes.reserve(ChunkCount); + for (const IoBuffer& Chunk : Chunks) + { + ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); + } + + CreateDirectories(TempDir.Path()); + + const std::string Bucket = "rightintwo"; + { + CasGc Gc; + ZenCacheNamespace Zcs(Gc, TempDir.Path()); + const GcClock::TimePoint CurrentTime = GcClock::Now(); + + for (size_t i = 0; i < ChunkCount; i++) + { + Zcs.Put(Bucket, ChunkHashes[i], {.Value = Chunks[i]}); + } + + std::vector<IoHash> KeepChunks; + for (size_t i = 0; i < ChunkCount; i += 2) + { + KeepChunks.push_back(ChunkHashes[i]); + } + GcContext GcCtx(CurrentTime + std::chrono::hours(2)); + GcCtx.MaxCacheDuration(std::chrono::minutes(2)); + GcCtx.CollectSmallObjects(true); + GcCtx.ContributeCas(KeepChunks); + Zcs.Flush(); + Gc.CollectGarbage(GcCtx); + } + std::filesystem::path BucketDir = TempDir.Path() / Bucket; + std::filesystem::path BlocksBaseDir = BucketDir / "blocks"; + + std::filesystem::path CasPath = BlockStore ::GetBlockPath(BlocksBaseDir, 1); + std::filesystem::path LegacyDataPath = GetLegacyDataPath(BucketDir); + std::filesystem::remove(LegacyDataPath); + std::filesystem::rename(CasPath, LegacyDataPath); + + std::vector<DiskIndexEntry> LogEntries; + std::filesystem::path IndexPath = GetIndexPath(BucketDir, Bucket); + if (std::filesystem::is_regular_file(IndexPath)) + { + BasicFile ObjectIndexFile; + ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); + uint64_t Size = ObjectIndexFile.FileSize(); + if (Size >= sizeof(CacheBucketIndexHeader)) + { + uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry); + CacheBucketIndexHeader Header; + ObjectIndexFile.Read(&Header, sizeof(Header), 0); + if (Header.Magic == CacheBucketIndexHeader::ExpectedMagic && Header.Version == CacheBucketIndexHeader::CurrentVersion && + Header.PayloadAlignment > 0 && Header.EntryCount == ExpectedEntryCount) + { + LogEntries.resize(Header.EntryCount); + ObjectIndexFile.Read(LogEntries.data(), Header.EntryCount * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader)); + } + } + ObjectIndexFile.Close(); + std::filesystem::remove(IndexPath); + } + + std::filesystem::path LogPath = GetLogPath(BucketDir, Bucket); + { + TCasLogFile<DiskIndexEntry> CasLog; + CasLog.Open(LogPath, CasLogFile::Mode::kRead); + LogEntries.reserve(CasLog.GetLogCount()); + CasLog.Replay([&](const DiskIndexEntry& Record) { LogEntries.push_back(Record); }, 0); + } + TCasLogFile<LegacyDiskIndexEntry> LegacyLog; + std::filesystem::path LegacylogPath = GetLegacyLogPath(BucketDir); + LegacyLog.Open(LegacylogPath, CasLogFile::Mode::kTruncate); + + for (const DiskIndexEntry& Entry : LogEntries) + { + uint64_t Size; + uint64_t Offset; + if (Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile)) + { + Size = Entry.Location.Location.StandaloneSize; + Offset = 0; + } + else + { + BlockStoreLocation Location = Entry.Location.GetBlockLocation(16); + Size = Location.Size; + Offset = Location.Offset; + } + LegacyDiskLocation LegacyLocation(Offset, Size, 0, static_cast<uint64_t>(Entry.Location.Flags) << 56); + LegacyDiskIndexEntry LegacyEntry = {.Key = Entry.Key, .Location = LegacyLocation}; + LegacyLog.Append(LegacyEntry); + } + LegacyLog.Close(); + + std::filesystem::remove_all(BlocksBaseDir); + std::filesystem::remove(LogPath); + std::filesystem::remove(IndexPath); + + { + CasGc Gc; + ZenCacheNamespace Zcs(Gc, TempDir.Path()); + + for (size_t i = 0; i < ChunkCount; i += 2) + { + ZenCacheValue Value; + CHECK(Zcs.Get(Bucket, ChunkHashes[i], Value)); + CHECK(ChunkHashes[i] == IoHash::HashBuffer(Value.Value)); + CHECK(!Zcs.Get(Bucket, ChunkHashes[i + 1], Value)); + } + } +} + +TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) +{ + // for (uint32_t i = 0; i < 100; ++i) + { + ScopedTemporaryDirectory TempDir; + + const uint64_t kChunkSize = 1048; + const int32_t kChunkCount = 8192; + + struct Chunk + { + std::string Bucket; + IoBuffer Buffer; + }; + std::unordered_map<IoHash, Chunk, IoHash::Hasher> Chunks; + Chunks.reserve(kChunkCount); + + const std::string Bucket1 = "rightinone"; + const std::string Bucket2 = "rightintwo"; + + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + while (true) + { + IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); + IoHash Hash = HashBuffer(Chunk); + if (Chunks.contains(Hash)) + { + continue; + } + Chunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk}; + break; + } + while (true) + { + IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); + IoHash Hash = HashBuffer(Chunk); + if (Chunks.contains(Hash)) + { + continue; + } + Chunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk}; + break; + } + } + + CreateDirectories(TempDir.Path()); + + WorkerThreadPool ThreadPool(4); + CasGc Gc; + ZenCacheNamespace Zcs(Gc, TempDir.Path()); + + { + std::atomic<size_t> WorkCompleted = 0; + for (const auto& Chunk : Chunks) + { + ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() { + Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}); + WorkCompleted.fetch_add(1); + }); + } + while (WorkCompleted < Chunks.size()) + { + Sleep(1); + } + } + + const uint64_t TotalSize = Zcs.StorageSize().DiskSize; + CHECK_EQ(kChunkSize * Chunks.size(), TotalSize); + + { + std::atomic<size_t> WorkCompleted = 0; + for (const auto& Chunk : Chunks) + { + ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() { + std::string Bucket = Chunk.second.Bucket; + IoHash ChunkHash = Chunk.first; + ZenCacheValue CacheValue; + + CHECK(Zcs.Get(Bucket, ChunkHash, CacheValue)); + IoHash Hash = IoHash::HashBuffer(CacheValue.Value); + CHECK(ChunkHash == Hash); + WorkCompleted.fetch_add(1); + }); + } + while (WorkCompleted < Chunks.size()) + { + Sleep(1); + } + } + std::unordered_map<IoHash, std::string, IoHash::Hasher> GcChunkHashes; + GcChunkHashes.reserve(Chunks.size()); + for (const auto& Chunk : Chunks) + { + GcChunkHashes[Chunk.first] = Chunk.second.Bucket; + } + { + std::unordered_map<IoHash, Chunk, IoHash::Hasher> NewChunks; + + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + { + IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); + IoHash Hash = HashBuffer(Chunk); + NewChunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk}; + } + { + IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); + IoHash Hash = HashBuffer(Chunk); + NewChunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk}; + } + } + + std::atomic<size_t> WorkCompleted = 0; + std::atomic_uint32_t AddedChunkCount = 0; + for (const auto& Chunk : NewChunks) + { + ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk, &AddedChunkCount]() { + Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}); + AddedChunkCount.fetch_add(1); + WorkCompleted.fetch_add(1); + }); + } + + for (const auto& Chunk : Chunks) + { + ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() { + ZenCacheValue CacheValue; + if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue)) + { + CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value)); + } + WorkCompleted.fetch_add(1); + }); + } + while (AddedChunkCount.load() < NewChunks.size()) + { + // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope + for (const auto& Chunk : NewChunks) + { + ZenCacheValue CacheValue; + if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue)) + { + GcChunkHashes[Chunk.first] = Chunk.second.Bucket; + } + } + std::vector<IoHash> KeepHashes; + KeepHashes.reserve(GcChunkHashes.size()); + for (const auto& Entry : GcChunkHashes) + { + KeepHashes.push_back(Entry.first); + } + size_t C = 0; + while (C < KeepHashes.size()) + { + if (C % 155 == 0) + { + if (C < KeepHashes.size() - 1) + { + KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); + } + if (C + 3 < KeepHashes.size() - 1) + { + KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); + } + } + C++; + } + + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + GcCtx.ContributeCas(KeepHashes); + Zcs.CollectGarbage(GcCtx); + CasChunkSet& Deleted = GcCtx.DeletedCas(); + Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); + } + + while (WorkCompleted < NewChunks.size() + Chunks.size()) + { + Sleep(1); + } + + { + // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope + for (const auto& Chunk : NewChunks) + { + ZenCacheValue CacheValue; + if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue)) + { + GcChunkHashes[Chunk.first] = Chunk.second.Bucket; + } + } + std::vector<IoHash> KeepHashes; + KeepHashes.reserve(GcChunkHashes.size()); + for (const auto& Entry : GcChunkHashes) + { + KeepHashes.push_back(Entry.first); + } + size_t C = 0; + while (C < KeepHashes.size()) + { + if (C % 155 == 0) + { + if (C < KeepHashes.size() - 1) + { + KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); + } + if (C + 3 < KeepHashes.size() - 1) + { + KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); + } + } + C++; + } + + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + GcCtx.ContributeCas(KeepHashes); + Zcs.CollectGarbage(GcCtx); + CasChunkSet& Deleted = GcCtx.DeletedCas(); + Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); + } + } + { + std::atomic<size_t> WorkCompleted = 0; + for (const auto& Chunk : GcChunkHashes) + { + ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() { + ZenCacheValue CacheValue; + CHECK(Zcs.Get(Chunk.second, Chunk.first, CacheValue)); + CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value)); + WorkCompleted.fetch_add(1); + }); + } + while (WorkCompleted < GcChunkHashes.size()) + { + Sleep(1); + } + } + } +} + +TEST_CASE("z$.namespaces") +{ + using namespace testutils; + + const auto CreateCacheValue = [](size_t Size) -> CbObject { + std::vector<uint8_t> Buf; + Buf.resize(Size); + + CbObjectWriter Writer; + Writer.AddBinary("Binary"sv, Buf.data(), Buf.size()); + return Writer.Save(); + }; + + ScopedTemporaryDirectory TempDir; + CreateDirectories(TempDir.Path()); + + { + CasGc Gc; + ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); + const auto Bucket = "teardrinker"sv; + const auto CustomNamespace = "mynamespace"sv; + + // Create a cache record + const IoHash Key = CreateKey(42); + CbObject CacheValue = CreateCacheValue(4096); + + IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); + Buffer.SetContentType(ZenContentType::kCbObject); + + ZenCacheValue PutValue = {.Value = Buffer}; + Zcs.Put(ZenCacheStore::DefaultNamespace, Bucket, Key, PutValue); + + ZenCacheValue GetValue; + CHECK(Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key, GetValue)); + + CHECK(!Zcs.Get(CustomNamespace, Bucket, Key, GetValue)); + + // This should just be dropped for now until we decide how we add namespaces + Zcs.Put(CustomNamespace, Bucket, Key, PutValue); + CHECK(!Zcs.Get(CustomNamespace, Bucket, Key, GetValue)); + + const IoHash Key2 = CreateKey(43); + CbObject CacheValue2 = CreateCacheValue(4096); + + IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer(); + Buffer2.SetContentType(ZenContentType::kCbObject); + ZenCacheValue PutValue2 = {.Value = Buffer2}; + Zcs.Put(CustomNamespace, Bucket, Key2, PutValue2); + + CHECK(!Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key2, GetValue)); + } +} + +TEST_CASE("z$.blocked.disklayer.put") +{ + ScopedTemporaryDirectory TempDir; + + GcStorageSize CacheSize; + + const auto CreateCacheValue = [](size_t Size) -> CbObject { + std::vector<uint8_t> Buf; + Buf.resize(Size); + + CbObjectWriter Writer; + Writer.AddBinary("Binary"sv, Buf.data(), Buf.size()); + return Writer.Save(); + }; + + CasGc Gc; + ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); + + CbObject CacheValue = CreateCacheValue(64 * 1024 + 64); + + IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); + Buffer.SetContentType(ZenContentType::kCbObject); + + size_t Key = Buffer.Size(); + IoHash HashKey = IoHash::HashBuffer(&Key, sizeof(uint32_t)); + Zcs.Put("test_bucket", HashKey, {.Value = Buffer}); + + ZenCacheValue BufferGet; + CHECK(Zcs.Get("test_bucket", HashKey, BufferGet)); + + // Overwriting with a value of same size should go fine + Zcs.Put("test_bucket", HashKey, {.Value = Buffer}); + + CbObject CacheValue2 = CreateCacheValue(64 * 1024 + 64 + 1); + IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer(); + Buffer2.SetContentType(ZenContentType::kCbObject); +# if ZEN_PLATFORM_WINDOWS + // On Windows platform, overwriting with different size while we have + // it open for read should throw exception if file is held open + CHECK_THROWS(Zcs.Put("test_bucket", HashKey, {.Value = Buffer2})); +# else + // Other platforms should handle overwrite just fine + Zcs.Put("test_bucket", HashKey, {.Value = Buffer2}); +# endif + + BufferGet = ZenCacheValue{}; + + // Read access has been removed, we should now be able to overwrite it + Zcs.Put("test_bucket", HashKey, {.Value = Buffer2}); +} + #endif void diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h index f39d01747..34b8d18f4 100644 --- a/zenserver/cache/structuredcachestore.h +++ b/zenserver/cache/structuredcachestore.h @@ -8,6 +8,7 @@ #include <zencore/thread.h> #include <zencore/uid.h> #include <zenstore/basicfile.h> +#include <zenstore/blockstore.h> #include <zenstore/cas.h> #include <zenstore/caslog.h> #include <zenstore/gc.h> @@ -76,37 +77,32 @@ struct DiskLocation { inline DiskLocation() = default; - inline DiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags) - : OffsetAndFlags(CombineOffsetAndFlags(Offset, Flags)) - , LowerSize(ValueSize & 0xFFFFffff) - , IndexDataSize(IndexSize) + inline DiskLocation(uint64_t ValueSize, uint8_t Flags) : Flags(Flags | kStandaloneFile) { Location.StandaloneSize = ValueSize; } + + inline DiskLocation(const BlockStoreLocation& Location, uint64_t PayloadAlignment, uint8_t Flags) : Flags(Flags & ~kStandaloneFile) { + this->Location.BlockLocation = BlockStoreDiskLocation(Location, PayloadAlignment); } - static const uint64_t kOffsetMask = 0x0000'ffFF'ffFF'ffFFull; - static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull; // Most significant bits of value size (lower 32 bits in LowerSize) - static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull; - static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull; // Stored as a separate file - static const uint64_t kStructured = 0x4000'0000'0000'0000ull; // Serialized as compact binary - static const uint64_t kTombStone = 0x2000'0000'0000'0000ull; // Represents a deleted key/value - static const uint64_t kCompressed = 0x1000'0000'0000'0000ull; // Stored in compressed buffer format - - static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags) { return Offset | Flags; } + inline BlockStoreLocation GetBlockLocation(uint64_t PayloadAlignment) const + { + ZEN_ASSERT(!(Flags & kStandaloneFile)); + return Location.BlockLocation.Get(PayloadAlignment); + } - inline uint64_t Offset() const { return OffsetAndFlags & kOffsetMask; } - inline uint64_t Size() const { return LowerSize; } - inline uint64_t IsFlagSet(uint64_t Flag) const { return OffsetAndFlags & Flag; } - inline uint64_t GetFlags() const { return OffsetAndFlags & kFlagsMask; } + inline uint64_t Size() const { return (Flags & kStandaloneFile) ? Location.StandaloneSize : Location.BlockLocation.GetSize(); } + inline uint8_t IsFlagSet(uint64_t Flag) const { return Flags & Flag; } + inline uint8_t GetFlags() const { return Flags; } inline ZenContentType GetContentType() const { ZenContentType ContentType = ZenContentType::kBinary; - if (IsFlagSet(DiskLocation::kStructured)) + if (IsFlagSet(kStructured)) { ContentType = ZenContentType::kCbObject; } - if (IsFlagSet(DiskLocation::kCompressed)) + if (IsFlagSet(kCompressed)) { ContentType = ZenContentType::kCompressedBinary; } @@ -114,21 +110,29 @@ struct DiskLocation return ContentType; } -private: - uint64_t OffsetAndFlags = 0; - uint32_t LowerSize = 0; - uint32_t IndexDataSize = 0; + union + { + BlockStoreDiskLocation BlockLocation; // 10 bytes + uint64_t StandaloneSize = 0; // 8 bytes + } Location; + + static const uint8_t kStandaloneFile = 0x80u; // Stored as a separate file + static const uint8_t kStructured = 0x40u; // Serialized as compact binary + static const uint8_t kTombStone = 0x20u; // Represents a deleted key/value + static const uint8_t kCompressed = 0x10u; // Stored in compressed buffer format + + uint8_t Flags = 0; + uint8_t Reserved = 0; }; struct DiskIndexEntry { - IoHash Key; - DiskLocation Location; + IoHash Key; // 20 bytes + DiskLocation Location; // 12 bytes }; - #pragma pack(pop) -static_assert(sizeof(DiskIndexEntry) == 36); +static_assert(sizeof(DiskIndexEntry) == 32); /** In-memory cache storage @@ -245,15 +249,19 @@ private: inline uint64_t TotalSize() const { return m_TotalSize.load(std::memory_order::relaxed); } private: + const uint64_t MaxBlockSize = 1ull << 30; + uint64_t m_PayloadAlignment = 1ull << 4; + std::string m_BucketName; std::filesystem::path m_BucketDir; + std::filesystem::path m_BlocksBasePath; + BlockStore m_BlockStore; Oid m_BucketId; bool m_IsOk = false; uint64_t m_LargeObjectThreshold = 64 * 1024; // These files are used to manage storage of small objects for this bucket - BasicFile m_SobsFile; TCasLogFile<DiskIndexEntry> m_SlogFile; struct IndexEntry @@ -264,10 +272,12 @@ private: IndexEntry() : Location(), LastAccess() {} IndexEntry(const DiskLocation& Loc, const int64_t Timestamp) : Location(Loc), LastAccess(Timestamp) {} IndexEntry(const IndexEntry& E) : Location(E.Location), LastAccess(E.LastAccess.load(std::memory_order_relaxed)) {} - IndexEntry(IndexEntry&& E) : Location(std::move(E.Location)), LastAccess(E.LastAccess.load(std::memory_order_relaxed)) {} + IndexEntry(IndexEntry&& E) noexcept : Location(std::move(E.Location)), LastAccess(E.LastAccess.load(std::memory_order_relaxed)) + { + } IndexEntry& operator=(const IndexEntry& E) { return *this = IndexEntry(E); } - IndexEntry& operator=(IndexEntry&& E) + IndexEntry& operator=(IndexEntry&& E) noexcept { Location = std::move(E.Location); LastAccess.store(E.LastAccess.load(), std::memory_order_relaxed); @@ -277,20 +287,21 @@ private: using IndexMap = tsl::robin_map<IoHash, IndexEntry, IoHash::Hasher>; - RwLock m_IndexLock; - IndexMap m_Index; - uint64_t m_SobsCursor = 0; + RwLock m_IndexLock; + IndexMap m_Index; + std::atomic_uint64_t m_TotalSize{}; - void BuildPath(PathBuilderBase& Path, const IoHash& HashKey); - void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value); - bool GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey, ZenCacheValue& OutValue); - void DeleteStandaloneCacheValue(const DiskLocation& Loc, - const IoHash& HashKey, - const std::filesystem::path& Path, - std::error_code& Ec); - bool GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue); - void OpenLog(const std::filesystem::path& BucketDir, const bool IsNew); + void BuildPath(PathBuilderBase& Path, const IoHash& HashKey); + void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value); + bool GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey, ZenCacheValue& OutValue); + void PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value); + bool GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue); + void MakeIndexSnapshot(); + uint64_t ReadIndexFile(); + uint64_t ReadLog(uint64_t LogPosition); + uint64_t MigrateLegacyData(bool CleanSource); + void OpenLog(const std::filesystem::path& BucketDir, const bool IsNew); // These locks are here to avoid contention on file creation, therefore it's sufficient // that we take the same lock for the same hash @@ -311,11 +322,11 @@ private: ZenCacheDiskLayer& operator=(const ZenCacheDiskLayer&) = delete; }; -class ZenCacheStore final : public GcStorage, public GcContributor +class ZenCacheNamespace final : public RefCounted, public GcStorage, public GcContributor { public: - ZenCacheStore(CasGc& Gc, const std::filesystem::path& RootDir); - ~ZenCacheStore(); + ZenCacheNamespace(CasGc& Gc, const std::filesystem::path& RootDir); + ~ZenCacheNamespace(); bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); @@ -338,8 +349,38 @@ private: std::unique_ptr<ZenCacheTracker> m_AccessTracker; #endif - ZenCacheStore(const ZenCacheStore&) = delete; - ZenCacheStore& operator=(const ZenCacheStore&) = delete; + ZenCacheNamespace(const ZenCacheNamespace&) = delete; + ZenCacheNamespace& operator=(const ZenCacheNamespace&) = delete; +}; + +class ZenCacheStore final : public GcStorage, public GcContributor +{ +public: + static constexpr std::string_view DefaultNamespace = + "!default!"; // This is intentionally not a valid namespace name and will only be used for mapping when no namespace is given + static constexpr std::string_view NamespaceDiskPrefix = "ns_"; + + ZenCacheStore(CasGc& Gc, std::filesystem::path BasePath); + ~ZenCacheStore(); + + bool Get(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); + bool DropBucket(std::string_view Namespace, std::string_view Bucket); + void Flush(); + void Scrub(ScrubContext& Ctx); + + virtual void GatherReferences(GcContext& GcCtx) override; + virtual void CollectGarbage(GcContext& GcCtx) override; + virtual GcStorageSize StorageSize() const override; + +private: + ZenCacheNamespace* GetNamespace(std::string_view Namespace); + void IterateNamespaces(const std::function<void(std::string_view Namespace, ZenCacheNamespace& Store)>& Callback) const; + + typedef std::unordered_map<std::string, std::unique_ptr<ZenCacheNamespace>> NameSpaceMap; + + mutable RwLock m_NamespacesLock; + NameSpaceMap m_Namespaces; }; void z$_forcelink(); diff --git a/zenserver/compute/function.cpp b/zenserver/compute/function.cpp index dd31013ef..171c67a6e 100644 --- a/zenserver/compute/function.cpp +++ b/zenserver/compute/function.cpp @@ -39,15 +39,17 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, { m_UpstreamApply = UpstreamApply::Create({}, m_CasStore, m_CidStore); - auto HordeUpstreamEndpoint = UpstreamApplyEndpoint::CreateHordeEndpoint(ComputeOptions, - ComputeAuthConfig, - StorageOptions, - StorageAuthConfig, - m_CasStore, - m_CidStore, - Mgr); - m_UpstreamApply->RegisterEndpoint(std::move(HordeUpstreamEndpoint)); - m_UpstreamApply->Initialize(); + InitializeThread = std::thread{[this, ComputeOptions, StorageOptions, ComputeAuthConfig, StorageAuthConfig, &Mgr] { + auto HordeUpstreamEndpoint = UpstreamApplyEndpoint::CreateHordeEndpoint(ComputeOptions, + ComputeAuthConfig, + StorageOptions, + StorageAuthConfig, + m_CasStore, + m_CidStore, + Mgr); + m_UpstreamApply->RegisterEndpoint(std::move(HordeUpstreamEndpoint)); + m_UpstreamApply->Initialize(); + }}; m_Router.AddPattern("job", "([[:digit:]]+)"); m_Router.AddPattern("worker", "([[:xdigit:]]{40})"); @@ -58,8 +60,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); - // Todo: check upstream health - return HttpReq.WriteResponse(HttpResponseCode::OK); + return HttpReq.WriteResponse(m_UpstreamApply->IsHealthy() ? HttpResponseCode::OK : HttpResponseCode::ServiceUnavailable); }, HttpVerb::kGet); diff --git a/zenserver/compute/function.h b/zenserver/compute/function.h index 2ddddabb4..efabe96ee 100644 --- a/zenserver/compute/function.h +++ b/zenserver/compute/function.h @@ -48,6 +48,7 @@ public: virtual void HandleRequest(HttpServerRequest& Request) override; private: + std::thread InitializeThread; spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; HttpRequestRouter m_Router; diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp index 617f50660..d18ae9e1a 100644 --- a/zenserver/projectstore.cpp +++ b/zenserver/projectstore.cpp @@ -804,29 +804,12 @@ ProjectStore::Project::DeleteOplog(std::string_view OplogId) void ProjectStore::Project::DiscoverOplogs() { - FileSystemTraversal Traversal; - struct Visitor : public FileSystemTraversal::TreeVisitor - { - virtual void VisitFile([[maybe_unused]] const std::filesystem::path& Parent, - [[maybe_unused]] const path_view& File, - [[maybe_unused]] uint64_t FileSize) override - { - } - - virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent, const path_view& DirectoryName) override - { - Dirs.push_back(PathToUtf8(DirectoryName)); - return false; - } - - std::vector<std::string> Dirs; - } Visit; - - Traversal.TraverseFileSystem(m_OplogStoragePath, Visit); + DirectoryContent DirContent; + GetDirectoryContent(m_OplogStoragePath, DirectoryContent::IncludeDirsFlag, DirContent); - for (const std::string& Dir : Visit.Dirs) + for (const std::filesystem::path& DirPath : DirContent.Directories) { - OpenOplog(Dir); + OpenOplog(PathToUtf8(DirPath.stem())); } } @@ -900,34 +883,18 @@ ProjectStore::BasePathForProject(std::string_view ProjectId) void ProjectStore::DiscoverProjects() { - FileSystemTraversal Traversal; - struct Visitor : public FileSystemTraversal::TreeVisitor - { - virtual void VisitFile([[maybe_unused]] const std::filesystem::path& Parent, - [[maybe_unused]] const path_view& File, - [[maybe_unused]] uint64_t FileSize) override - { - } - - virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent, const path_view& DirectoryName) override - { - Dirs.push_back(PathToUtf8(DirectoryName)); - return false; - } - - std::vector<std::string> Dirs; - } Visit; - if (!std::filesystem::exists(m_ProjectBasePath)) { return; } - Traversal.TraverseFileSystem(m_ProjectBasePath, Visit); + DirectoryContent DirContent; + GetDirectoryContent(m_ProjectBasePath, DirectoryContent::IncludeDirsFlag, DirContent); - for (const auto& Dir : Visit.Dirs) + for (const std::filesystem::path& DirPath : DirContent.Directories) { - Project* Project = OpenProject(Dir); + std::string DirName = PathToUtf8(DirPath.stem()); + Project* Project = OpenProject(DirName); if (Project) { @@ -976,7 +943,7 @@ ProjectStore::GatherReferences(GcContext& GcCtx) { Stopwatch Timer; const auto Guard = - MakeGuard([this, &Timer] { ZEN_INFO("project store gathered all references in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); + MakeGuard([&] { ZEN_INFO("project store gathered all references in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); DiscoverProjects(); diff --git a/zenserver/upstream/hordecompute.cpp b/zenserver/upstream/hordecompute.cpp index dbf86cc13..2ec24b303 100644 --- a/zenserver/upstream/hordecompute.cpp +++ b/zenserver/upstream/hordecompute.cpp @@ -198,7 +198,8 @@ namespace detail { } { - PutRefResult RefResult = StorageSession.PutRef("requests"sv, + PutRefResult RefResult = StorageSession.PutRef(StorageSession.Client().DefaultBlobStoreNamespace(), + "requests"sv, UpstreamData.TaskId, UpstreamData.Objects[UpstreamData.TaskId].GetBuffer().AsIoBuffer(), ZenContentType::kCbObject); @@ -292,7 +293,7 @@ namespace detail { std::set<IoHash> Keys; std::transform(Blobs.begin(), Blobs.end(), std::inserter(Keys, Keys.end()), [](const auto& It) { return It.first; }); - CloudCacheExistsResult ExistsResult = Session.BlobExists(Keys); + CloudCacheExistsResult ExistsResult = Session.BlobExists(Session.Client().DefaultBlobStoreNamespace(), Keys); Log().debug("Queried {} missing blobs Need={} Duration={}s Result={}", Keys.size(), ExistsResult.Needs.size(), @@ -309,7 +310,7 @@ namespace detail { for (const auto& Hash : ExistsResult.Needs) { - CloudCacheResult Result = Session.PutBlob(Hash, Blobs.at(Hash)); + CloudCacheResult Result = Session.PutBlob(Session.Client().DefaultBlobStoreNamespace(), Hash, Blobs.at(Hash)); Log().debug("Put blob {} Bytes={} Duration={}s Result={}", Hash, Result.Bytes, Result.ElapsedSeconds, Result.Success); Bytes += Result.Bytes; ElapsedSeconds += Result.ElapsedSeconds; @@ -339,7 +340,7 @@ namespace detail { std::set<IoHash> Keys; std::transform(Objects.begin(), Objects.end(), std::inserter(Keys, Keys.end()), [](const auto& It) { return It.first; }); - CloudCacheExistsResult ExistsResult = Session.ObjectExists(Keys); + CloudCacheExistsResult ExistsResult = Session.ObjectExists(Session.Client().DefaultBlobStoreNamespace(), Keys); Log().debug("Queried {} missing objects Need={} Duration={}s Result={}", Keys.size(), ExistsResult.Needs.size(), @@ -356,7 +357,8 @@ namespace detail { for (const auto& Hash : ExistsResult.Needs) { - CloudCacheResult Result = Session.PutObject(Hash, Objects.at(Hash).GetBuffer().AsIoBuffer()); + CloudCacheResult Result = + Session.PutObject(Session.Client().DefaultBlobStoreNamespace(), Hash, Objects.at(Hash).GetBuffer().AsIoBuffer()); Log().debug("Put object {} Bytes={} Duration={}s Result={}", Hash, Result.Bytes, Result.ElapsedSeconds, Result.Success); Bytes += Result.Bytes; ElapsedSeconds += Result.ElapsedSeconds; @@ -691,7 +693,8 @@ namespace detail { std::map<IoHash, IoBuffer> BinaryData; { - CloudCacheResult ObjectRefResult = Session.GetRef("responses"sv, ResultHash, ZenContentType::kCbObject); + CloudCacheResult ObjectRefResult = + Session.GetRef(Session.Client().DefaultBlobStoreNamespace(), "responses"sv, ResultHash, ZenContentType::kCbObject); Log().debug("Get ref {} Bytes={} Duration={}s Result={}", ResultHash, ObjectRefResult.Bytes, @@ -718,7 +721,8 @@ namespace detail { std::set<IoHash> NeededData; if (OutputHash != IoHash::Zero) { - GetObjectReferencesResult ObjectReferenceResult = Session.GetObjectReferences(OutputHash); + GetObjectReferencesResult ObjectReferenceResult = + Session.GetObjectReferences(Session.Client().DefaultBlobStoreNamespace(), OutputHash); Log().debug("Get object references {} References={} Bytes={} Duration={}s Result={}", ResultHash, ObjectReferenceResult.References.size(), @@ -748,7 +752,7 @@ namespace detail { { continue; } - CloudCacheResult BlobResult = Session.GetBlob(Hash); + CloudCacheResult BlobResult = Session.GetBlob(Session.Client().DefaultBlobStoreNamespace(), Hash); Log().debug("Get blob {} Bytes={} Duration={}s Result={}", Hash, BlobResult.Bytes, diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index 4bec41a29..ddc6c49d2 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -83,12 +83,12 @@ CloudCacheSession::Authenticate() } CloudCacheResult -CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Key) +CloudCacheSession::GetDerivedData(std::string_view Namespace, std::string_view BucketId, std::string_view Key) { ZEN_TRACE_CPU("HordeClient::GetDerivedData"); ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << Namespace << "/" << BucketId << "/" << Key; cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -115,19 +115,18 @@ CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Ke } CloudCacheResult -CloudCacheSession::GetDerivedData(std::string_view BucketId, const IoHash& Key) +CloudCacheSession::GetDerivedData(std::string_view Namespace, std::string_view BucketId, const IoHash& Key) { - return GetDerivedData(BucketId, Key.ToHexString()); + return GetDerivedData(Namespace, BucketId, Key.ToHexString()); } CloudCacheResult -CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenContentType RefType) +CloudCacheSession::GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType) { const std::string ContentType = RefType == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"; ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/" - << Key.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -155,10 +154,10 @@ CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenConte } CloudCacheResult -CloudCacheSession::GetBlob(const IoHash& Key) +CloudCacheSession::GetBlob(std::string_view Namespace, const IoHash& Key) { ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << Namespace << "/" << Key.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -187,12 +186,12 @@ CloudCacheSession::GetBlob(const IoHash& Key) } CloudCacheResult -CloudCacheSession::GetCompressedBlob(const IoHash& Key) +CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& Key) { ZEN_TRACE_CPU("HordeClient::GetCompressedBlob"); ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << Key.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -220,12 +219,12 @@ CloudCacheSession::GetCompressedBlob(const IoHash& Key) } CloudCacheResult -CloudCacheSession::GetObject(const IoHash& Key) +CloudCacheSession::GetObject(std::string_view Namespace, const IoHash& Key) { ZEN_TRACE_CPU("HordeClient::GetObject"); ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -253,14 +252,14 @@ CloudCacheSession::GetObject(const IoHash& Key) } CloudCacheResult -CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Key, IoBuffer DerivedData) +CloudCacheSession::PutDerivedData(std::string_view Namespace, std::string_view BucketId, std::string_view Key, IoBuffer DerivedData) { ZEN_TRACE_CPU("HordeClient::PutDerivedData"); IoHash Hash = IoHash::HashBuffer(DerivedData.Data(), DerivedData.Size()); ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << Namespace << "/" << BucketId << "/" << Key; cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -289,13 +288,13 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Ke } CloudCacheResult -CloudCacheSession::PutDerivedData(std::string_view BucketId, const IoHash& Key, IoBuffer DerivedData) +CloudCacheSession::PutDerivedData(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer DerivedData) { - return PutDerivedData(BucketId, Key.ToHexString(), DerivedData); + return PutDerivedData(Namespace, BucketId, Key.ToHexString(), DerivedData); } PutRefResult -CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType) +CloudCacheSession::PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType) { ZEN_TRACE_CPU("HordeClient::PutRef"); @@ -304,8 +303,7 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer const std::string ContentType = RefType == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"; ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/" - << Key.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -356,13 +354,13 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer } FinalizeRefResult -CloudCacheSession::FinalizeRef(std::string_view BucketId, const IoHash& Key, const IoHash& RefHash) +CloudCacheSession::FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHash) { ZEN_TRACE_CPU("HordeClient::FinalizeRef"); ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/" - << Key.ToHexString() << "/finalize/" << RefHash.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString() << "/finalize/" + << RefHash.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -414,12 +412,12 @@ CloudCacheSession::FinalizeRef(std::string_view BucketId, const IoHash& Key, con } CloudCacheResult -CloudCacheSession::PutBlob(const IoHash& Key, IoBuffer Blob) +CloudCacheSession::PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob) { ZEN_TRACE_CPU("HordeClient::PutBlob"); ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << Namespace << "/" << Key.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -446,12 +444,12 @@ CloudCacheSession::PutBlob(const IoHash& Key, IoBuffer Blob) } CloudCacheResult -CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob) +CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob) { ZEN_TRACE_CPU("HordeClient::PutCompressedBlob"); ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << Key.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -478,12 +476,12 @@ CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob) } CloudCacheResult -CloudCacheSession::PutObject(const IoHash& Key, IoBuffer Object) +CloudCacheSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object) { ZEN_TRACE_CPU("HordeClient::PutObject"); ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -510,13 +508,12 @@ CloudCacheSession::PutObject(const IoHash& Key, IoBuffer Object) } CloudCacheResult -CloudCacheSession::RefExists(std::string_view BucketId, const IoHash& Key) +CloudCacheSession::RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key) { ZEN_TRACE_CPU("HordeClient::RefExists"); ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/" - << Key.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -541,13 +538,12 @@ CloudCacheSession::RefExists(std::string_view BucketId, const IoHash& Key) } GetObjectReferencesResult -CloudCacheSession::GetObjectReferences(const IoHash& Key) +CloudCacheSession::GetObjectReferences(std::string_view Namespace, const IoHash& Key) { ZEN_TRACE_CPU("HordeClient::GetObjectReferences"); ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString() - << "/references"; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString() << "/references"; cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -585,39 +581,39 @@ CloudCacheSession::GetObjectReferences(const IoHash& Key) } CloudCacheResult -CloudCacheSession::BlobExists(const IoHash& Key) +CloudCacheSession::BlobExists(std::string_view Namespace, const IoHash& Key) { - return CacheTypeExists("blobs"sv, Key); + return CacheTypeExists(Namespace, "blobs"sv, Key); } CloudCacheResult -CloudCacheSession::CompressedBlobExists(const IoHash& Key) +CloudCacheSession::CompressedBlobExists(std::string_view Namespace, const IoHash& Key) { - return CacheTypeExists("compressed-blobs"sv, Key); + return CacheTypeExists(Namespace, "compressed-blobs"sv, Key); } CloudCacheResult -CloudCacheSession::ObjectExists(const IoHash& Key) +CloudCacheSession::ObjectExists(std::string_view Namespace, const IoHash& Key) { - return CacheTypeExists("objects"sv, Key); + return CacheTypeExists(Namespace, "objects"sv, Key); } CloudCacheExistsResult -CloudCacheSession::BlobExists(const std::set<IoHash>& Keys) +CloudCacheSession::BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys) { - return CacheTypeExists("blobs"sv, Keys); + return CacheTypeExists(Namespace, "blobs"sv, Keys); } CloudCacheExistsResult -CloudCacheSession::CompressedBlobExists(const std::set<IoHash>& Keys) +CloudCacheSession::CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys) { - return CacheTypeExists("compressed-blobs"sv, Keys); + return CacheTypeExists(Namespace, "compressed-blobs"sv, Keys); } CloudCacheExistsResult -CloudCacheSession::ObjectExists(const std::set<IoHash>& Keys) +CloudCacheSession::ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys) { - return CacheTypeExists("objects"sv, Keys); + return CacheTypeExists(Namespace, "objects"sv, Keys); } CloudCacheResult @@ -685,11 +681,11 @@ CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t } std::vector<IoHash> -CloudCacheSession::Filter(std::string_view BucketId, const std::vector<IoHash>& ChunkHashes) +CloudCacheSession::Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes) { ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl(); - Uri << "/api/v1/s/" << m_CacheClient->DdcNamespace(); + Uri << "/api/v1/s/" << Namespace; ZEN_UNUSED(BucketId, ChunkHashes); @@ -715,12 +711,12 @@ CloudCacheSession::VerifyAccessToken(long StatusCode) } CloudCacheResult -CloudCacheSession::CacheTypeExists(std::string_view TypeId, const IoHash& Key) +CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key) { ZEN_TRACE_CPU("HordeClient::CacheTypeExists"); ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << Namespace << "/" << Key.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -745,7 +741,7 @@ CloudCacheSession::CacheTypeExists(std::string_view TypeId, const IoHash& Key) } CloudCacheExistsResult -CloudCacheSession::CacheTypeExists(std::string_view TypeId, const std::set<IoHash>& Keys) +CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys) { ZEN_TRACE_CPU("HordeClient::CacheTypeExists"); @@ -758,7 +754,7 @@ CloudCacheSession::CacheTypeExists(std::string_view TypeId, const std::set<IoHas Body << "]"; ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << m_CacheClient->BlobStoreNamespace() << "/exist"; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << Namespace << "/exist"; cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -894,8 +890,8 @@ CloudCacheTokenProvider::CreateFromCallback(std::function<CloudCacheAccessToken( CloudCacheClient::CloudCacheClient(const CloudCacheClientOptions& Options, std::unique_ptr<CloudCacheTokenProvider> TokenProvider) : m_Log(zen::logging::Get("jupiter")) , m_ServiceUrl(Options.ServiceUrl) -, m_DdcNamespace(Options.DdcNamespace) -, m_BlobStoreNamespace(Options.BlobStoreNamespace) +, m_DefaultDdcNamespace(Options.DdcNamespace) +, m_DefaultBlobStoreNamespace(Options.BlobStoreNamespace) , m_ComputeCluster(Options.ComputeCluster) , m_ConnectTimeout(Options.ConnectTimeout) , m_Timeout(Options.Timeout) diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h index cff9a9ef1..3d9e6ea7b 100644 --- a/zenserver/upstream/jupiter.h +++ b/zenserver/upstream/jupiter.h @@ -95,38 +95,40 @@ public: ~CloudCacheSession(); CloudCacheResult Authenticate(); - CloudCacheResult GetDerivedData(std::string_view BucketId, std::string_view Key); - CloudCacheResult GetDerivedData(std::string_view BucketId, const IoHash& Key); - CloudCacheResult GetRef(std::string_view BucketId, const IoHash& Key, ZenContentType RefType); - CloudCacheResult GetBlob(const IoHash& Key); - CloudCacheResult GetCompressedBlob(const IoHash& Key); - CloudCacheResult GetObject(const IoHash& Key); + CloudCacheResult GetDerivedData(std::string_view Namespace, std::string_view BucketId, std::string_view Key); + CloudCacheResult GetDerivedData(std::string_view Namespace, std::string_view BucketId, const IoHash& Key); + CloudCacheResult GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType); + CloudCacheResult GetBlob(std::string_view Namespace, const IoHash& Key); + CloudCacheResult GetCompressedBlob(std::string_view Namespace, const IoHash& Key); + CloudCacheResult GetObject(std::string_view Namespace, const IoHash& Key); - CloudCacheResult PutDerivedData(std::string_view BucketId, std::string_view Key, IoBuffer DerivedData); - CloudCacheResult PutDerivedData(std::string_view BucketId, const IoHash& Key, IoBuffer DerivedData); - PutRefResult PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType); - CloudCacheResult PutBlob(const IoHash& Key, IoBuffer Blob); - CloudCacheResult PutCompressedBlob(const IoHash& Key, IoBuffer Blob); - CloudCacheResult PutObject(const IoHash& Key, IoBuffer Object); + CloudCacheResult PutDerivedData(std::string_view Namespace, std::string_view BucketId, std::string_view Key, IoBuffer DerivedData); + CloudCacheResult PutDerivedData(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer DerivedData); + PutRefResult PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType); + CloudCacheResult PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob); + CloudCacheResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob); + CloudCacheResult PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object); - FinalizeRefResult FinalizeRef(std::string_view BucketId, const IoHash& Key, const IoHash& RefHah); + FinalizeRefResult FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHah); - CloudCacheResult RefExists(std::string_view BucketId, const IoHash& Key); + CloudCacheResult RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key); - GetObjectReferencesResult GetObjectReferences(const IoHash& Key); + GetObjectReferencesResult GetObjectReferences(std::string_view Namespace, const IoHash& Key); - CloudCacheResult BlobExists(const IoHash& Key); - CloudCacheResult CompressedBlobExists(const IoHash& Key); - CloudCacheResult ObjectExists(const IoHash& Key); + CloudCacheResult BlobExists(std::string_view Namespace, const IoHash& Key); + CloudCacheResult CompressedBlobExists(std::string_view Namespace, const IoHash& Key); + CloudCacheResult ObjectExists(std::string_view Namespace, const IoHash& Key); - CloudCacheExistsResult BlobExists(const std::set<IoHash>& Keys); - CloudCacheExistsResult CompressedBlobExists(const std::set<IoHash>& Keys); - CloudCacheExistsResult ObjectExists(const std::set<IoHash>& Keys); + CloudCacheExistsResult BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys); + CloudCacheExistsResult CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys); + CloudCacheExistsResult ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys); CloudCacheResult PostComputeTasks(IoBuffer TasksData); CloudCacheResult GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds = 0); - std::vector<IoHash> Filter(std::string_view BucketId, const std::vector<IoHash>& ChunkHashes); + std::vector<IoHash> Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes); + + CloudCacheClient& Client() { return *m_CacheClient; }; private: inline spdlog::logger& Log() { return m_Log; } @@ -134,9 +136,9 @@ private: CloudCacheAccessToken GetAccessToken(bool RefreshToken = false); bool VerifyAccessToken(long StatusCode); - CloudCacheResult CacheTypeExists(std::string_view TypeId, const IoHash& Key); + CloudCacheResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key); - CloudCacheExistsResult CacheTypeExists(std::string_view TypeId, const std::set<IoHash>& Keys); + CloudCacheExistsResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys); spdlog::logger& m_Log; RefPtr<CloudCacheClient> m_CacheClient; @@ -189,8 +191,8 @@ public: ~CloudCacheClient(); CloudCacheAccessToken AcquireAccessToken(); - std::string_view DdcNamespace() const { return m_DdcNamespace; } - std::string_view BlobStoreNamespace() const { return m_BlobStoreNamespace; } + std::string_view DefaultDdcNamespace() const { return m_DefaultDdcNamespace; } + std::string_view DefaultBlobStoreNamespace() const { return m_DefaultBlobStoreNamespace; } std::string_view ComputeCluster() const { return m_ComputeCluster; } std::string_view ServiceUrl() const { return m_ServiceUrl; } @@ -199,8 +201,8 @@ public: private: spdlog::logger& m_Log; std::string m_ServiceUrl; - std::string m_DdcNamespace; - std::string m_BlobStoreNamespace; + std::string m_DefaultDdcNamespace; + std::string m_DefaultBlobStoreNamespace; std::string m_ComputeCluster; std::chrono::milliseconds m_ConnectTimeout{}; std::chrono::milliseconds m_Timeout{}; diff --git a/zenserver/upstream/upstreamapply.cpp b/zenserver/upstream/upstreamapply.cpp index 9758e7565..c397bb141 100644 --- a/zenserver/upstream/upstreamapply.cpp +++ b/zenserver/upstream/upstreamapply.cpp @@ -119,6 +119,22 @@ public: return m_RunState.IsRunning; } + virtual bool IsHealthy() const override + { + if (m_RunState.IsRunning) + { + for (const auto& Endpoint : m_Endpoints) + { + if (Endpoint->IsHealthy()) + { + return true; + } + } + } + + return false; + } + virtual void RegisterEndpoint(std::unique_ptr<UpstreamApplyEndpoint> Endpoint) override { m_Endpoints.emplace_back(std::move(Endpoint)); @@ -429,6 +445,12 @@ private: ////////////////////////////////////////////////////////////////////////// +bool +UpstreamApply::IsHealthy() const +{ + return false; +} + std::unique_ptr<UpstreamApply> UpstreamApply::Create(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore) { diff --git a/zenserver/upstream/upstreamapply.h b/zenserver/upstream/upstreamapply.h index c6e38142c..1deaf00a5 100644 --- a/zenserver/upstream/upstreamapply.h +++ b/zenserver/upstream/upstreamapply.h @@ -23,7 +23,7 @@ class CbObjectWriter; class CidStore; class CloudCacheTokenProvider; class WorkerThreadPool; -class ZenCacheStore; +class ZenCacheNamespace; struct CloudCacheClientOptions; struct UpstreamAuthConfig; @@ -167,6 +167,7 @@ public: virtual ~UpstreamApply() = default; virtual bool Initialize() = 0; + virtual bool IsHealthy() const = 0; virtual void RegisterEndpoint(std::unique_ptr<UpstreamApplyEndpoint> Endpoint) = 0; struct EnqueueResult diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index da0743f0a..98b4439c7 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -160,11 +160,29 @@ namespace detail { } } + std::string_view GetActualDdcNamespace(CloudCacheSession& Session, std::string_view Namespace) + { + if (Namespace == ZenCacheStore::DefaultNamespace) + { + return Session.Client().DefaultDdcNamespace(); + } + return Namespace; + } + + std::string_view GetActualBlobStoreNamespace(CloudCacheSession& Session, std::string_view Namespace) + { + if (Namespace == ZenCacheStore::DefaultNamespace) + { + return Session.Client().DefaultBlobStoreNamespace(); + } + return Namespace; + } + virtual UpstreamEndpointState GetState() override { return m_Status.EndpointState(); } virtual UpstreamEndpointStatus GetStatus() override { return m_Status.EndpointStatus(); } - virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override + virtual GetUpstreamCacheResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) override { ZEN_TRACE_CPU("Upstream::Horde::GetSingleCacheRecord"); @@ -173,13 +191,16 @@ namespace detail { CloudCacheSession Session(m_Client); CloudCacheResult Result; + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); + if (m_UseLegacyDdc && Type == ZenContentType::kBinary) { - Result = Session.GetDerivedData(CacheKey.Bucket, CacheKey.Hash); + std::string_view DdcNamespace = GetActualDdcNamespace(Session, Namespace); + Result = Session.GetDerivedData(DdcNamespace, CacheKey.Bucket, CacheKey.Hash); } else if (Type == ZenContentType::kCompressedBinary) { - Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject); + Result = Session.GetRef(BlobStoreNamespace, CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject); if (Result.Success) { @@ -190,23 +211,22 @@ namespace detail { IoBuffer ContentBuffer; int NumAttachments = 0; - CacheRecord.IterateAttachments( - [&Session, &Result, &ContentBuffer, &NumAttachments](CbFieldView AttachmentHash) { - CloudCacheResult AttachmentResult = Session.GetCompressedBlob(AttachmentHash.AsHash()); - Result.Bytes += AttachmentResult.Bytes; - Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds; - Result.ErrorCode = AttachmentResult.ErrorCode; + CacheRecord.IterateAttachments([&](CbFieldView AttachmentHash) { + CloudCacheResult AttachmentResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash()); + Result.Bytes += AttachmentResult.Bytes; + Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds; + Result.ErrorCode = AttachmentResult.ErrorCode; - if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response))) - { - Result.Response = AttachmentResult.Response; - ++NumAttachments; - } - else - { - Result.Success = false; - } - }); + if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response))) + { + Result.Response = AttachmentResult.Response; + ++NumAttachments; + } + else + { + Result.Success = false; + } + }); if (NumAttachments != 1) { Result.Success = false; @@ -217,7 +237,7 @@ namespace detail { else { const ZenContentType AcceptType = Type == ZenContentType::kCbPackage ? ZenContentType::kCbObject : Type; - Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, AcceptType); + Result = Session.GetRef(BlobStoreNamespace, CacheKey.Bucket, CacheKey.Hash, AcceptType); if (Result.Success && Type == ZenContentType::kCbPackage) { @@ -228,8 +248,8 @@ namespace detail { { CbObject CacheRecord = LoadCompactBinaryObject(Result.Response); - CacheRecord.IterateAttachments([&Session, &Result, &Package](CbFieldView AttachmentHash) { - CloudCacheResult AttachmentResult = Session.GetCompressedBlob(AttachmentHash.AsHash()); + CacheRecord.IterateAttachments([&](CbFieldView AttachmentHash) { + CloudCacheResult AttachmentResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash()); Result.Bytes += AttachmentResult.Bytes; Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds; Result.ErrorCode = AttachmentResult.ErrorCode; @@ -279,7 +299,9 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKeyRequest*> Requests, OnCacheRecordGetComplete&& OnComplete) override + virtual GetUpstreamCacheResult GetCacheRecords(std::string_view Namespace, + std::span<CacheKeyRequest*> Requests, + OnCacheRecordGetComplete&& OnComplete) override { ZEN_TRACE_CPU("Upstream::Horde::GetCacheRecords"); @@ -294,7 +316,9 @@ namespace detail { if (!Result.Error) { - CloudCacheResult RefResult = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject); + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); + CloudCacheResult RefResult = + Session.GetRef(BlobStoreNamespace, CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject); AppendResult(RefResult, Result); m_Status.SetFromErrorCode(RefResult.ErrorCode, RefResult.Reason); @@ -305,8 +329,8 @@ namespace detail { if (ValidationResult == CbValidateError::None) { Record = LoadCompactBinaryObject(RefResult.Response); - Record.IterateAttachments([this, &Session, &Result, &Package](CbFieldView AttachmentHash) { - CloudCacheResult BlobResult = Session.GetCompressedBlob(AttachmentHash.AsHash()); + Record.IterateAttachments([&](CbFieldView AttachmentHash) { + CloudCacheResult BlobResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash()); AppendResult(BlobResult, Result); m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); @@ -329,14 +353,15 @@ namespace detail { return Result; } - virtual GetUpstreamCacheResult GetCacheValue(const CacheKey&, const IoHash& ValueContentId) override + virtual GetUpstreamCacheResult GetCacheValue(std::string_view Namespace, const CacheKey&, const IoHash& ValueContentId) override { ZEN_TRACE_CPU("Upstream::Horde::GetSingleCacheValue"); try { CloudCacheSession Session(m_Client); - const CloudCacheResult Result = Session.GetCompressedBlob(ValueContentId); + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); + const CloudCacheResult Result = Session.GetCompressedBlob(BlobStoreNamespace, ValueContentId); m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -360,7 +385,8 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest*> CacheChunkRequests, + virtual GetUpstreamCacheResult GetCacheValues(std::string_view Namespace, + std::span<CacheChunkRequest*> CacheChunkRequests, OnCacheValueGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::Horde::GetCacheValues"); @@ -376,8 +402,9 @@ namespace detail { CompressedBuffer Compressed; if (!Result.Error) { - const CloudCacheResult BlobResult = Session.GetCompressedBlob(Request.ChunkId); - Payload = BlobResult.Response; + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); + const CloudCacheResult BlobResult = Session.GetCompressedBlob(BlobStoreNamespace, Request.ChunkId); + Payload = BlobResult.Response; AppendResult(BlobResult, Result); @@ -422,13 +449,18 @@ namespace detail { CloudCacheResult Result; for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheRecord.Namespace); if (m_UseLegacyDdc) { - Result = Session.PutDerivedData(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue); + Result = Session.PutDerivedData(BlobStoreNamespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue); } else { - Result = Session.PutRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue, ZenContentType::kBinary); + Result = Session.PutRef(BlobStoreNamespace, + CacheRecord.Key.Bucket, + CacheRecord.Key.Hash, + RecordValue, + ZenContentType::kBinary); } } @@ -455,6 +487,7 @@ namespace detail { return PerformStructuredPut( Session, + CacheRecord.Namespace, CacheRecord.Key, ReferencingObject.Save().GetBuffer().AsIoBuffer(), MaxAttempts, @@ -474,6 +507,7 @@ namespace detail { { return PerformStructuredPut( Session, + CacheRecord.Namespace, CacheRecord.Key, RecordValue, MaxAttempts, @@ -519,6 +553,7 @@ namespace detail { PutUpstreamCacheResult PerformStructuredPut( CloudCacheSession& Session, + std::string_view Namespace, const CacheKey& Key, IoBuffer ObjectBuffer, const int32_t MaxAttempts, @@ -527,7 +562,8 @@ namespace detail { int64_t TotalBytes = 0ull; double TotalElapsedSeconds = 0.0; - const auto PutBlobs = [&](std::span<IoHash> ValueContentIds, std::string& OutReason) -> bool { + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); + const auto PutBlobs = [&](std::span<IoHash> ValueContentIds, std::string& OutReason) -> bool { for (const IoHash& ValueContentId : ValueContentIds) { IoBuffer BlobBuffer; @@ -539,7 +575,7 @@ namespace detail { CloudCacheResult BlobResult; for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++) { - BlobResult = Session.PutCompressedBlob(ValueContentId, BlobBuffer); + BlobResult = Session.PutCompressedBlob(BlobStoreNamespace, ValueContentId, BlobBuffer); } m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); @@ -560,7 +596,7 @@ namespace detail { PutRefResult RefResult; for (int32_t Attempt = 0; Attempt < MaxAttempts && !RefResult.Success; Attempt++) { - RefResult = Session.PutRef(Key.Bucket, Key.Hash, ObjectBuffer, ZenContentType::kCbObject); + RefResult = Session.PutRef(BlobStoreNamespace, Key.Bucket, Key.Hash, ObjectBuffer, ZenContentType::kCbObject); } m_Status.SetFromErrorCode(RefResult.ErrorCode, RefResult.Reason); @@ -581,7 +617,7 @@ namespace detail { } const IoHash RefHash = IoHash::HashBuffer(ObjectBuffer); - FinalizeRefResult FinalizeResult = Session.FinalizeRef(Key.Bucket, Key.Hash, RefHash); + FinalizeRefResult FinalizeResult = Session.FinalizeRef(BlobStoreNamespace, Key.Bucket, Key.Hash, RefHash); m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason); @@ -599,7 +635,7 @@ namespace detail { return {.Reason = std::move(Reason), .Success = false}; } - FinalizeResult = Session.FinalizeRef(Key.Bucket, Key.Hash, RefHash); + FinalizeResult = Session.FinalizeRef(BlobStoreNamespace, Key.Bucket, Key.Hash, RefHash); m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason); @@ -708,14 +744,14 @@ namespace detail { virtual UpstreamEndpointStatus GetStatus() override { return m_Status.EndpointStatus(); } - virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override + virtual GetUpstreamCacheResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) override { ZEN_TRACE_CPU("Upstream::Zen::GetSingleCacheRecord"); try { ZenStructuredCacheSession Session(*m_Client); - const ZenCacheResult Result = Session.GetCacheRecord(CacheKey.Bucket, CacheKey.Hash, Type); + const ZenCacheResult Result = Session.GetCacheRecord(Namespace, CacheKey.Bucket, CacheKey.Hash, Type); m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -739,20 +775,24 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKeyRequest*> Requests, OnCacheRecordGetComplete&& OnComplete) override + virtual GetUpstreamCacheResult GetCacheRecords(std::string_view Namespace, + std::span<CacheKeyRequest*> Requests, + OnCacheRecordGetComplete&& OnComplete) override { ZEN_TRACE_CPU("Upstream::Zen::GetCacheRecords"); ZEN_ASSERT(Requests.size() > 0); CbObjectWriter BatchRequest; BatchRequest << "Method"sv - << "GetCacheRecords"; + << "GetCacheRecords"sv; BatchRequest.BeginObject("Params"sv); { CachePolicy DefaultPolicy = Requests[0]->Policy.GetRecordPolicy(); BatchRequest << "DefaultPolicy"sv << WriteToString<128>(DefaultPolicy); + BatchRequest << "Namespace"sv << Namespace; + BatchRequest.BeginArray("Requests"sv); for (CacheKeyRequest* Request : Requests) { @@ -817,14 +857,16 @@ namespace detail { return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; } - virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) override + virtual GetUpstreamCacheResult GetCacheValue(std::string_view Namespace, + const CacheKey& CacheKey, + const IoHash& ValueContentId) override { ZEN_TRACE_CPU("Upstream::Zen::GetSingleCacheValue"); try { ZenStructuredCacheSession Session(*m_Client); - const ZenCacheResult Result = Session.GetCacheValue(CacheKey.Bucket, CacheKey.Hash, ValueContentId); + const ZenCacheResult Result = Session.GetCacheValue(Namespace, CacheKey.Bucket, CacheKey.Hash, ValueContentId); m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -848,7 +890,8 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest*> CacheChunkRequests, + virtual GetUpstreamCacheResult GetCacheValues(std::string_view Namespace, + std::span<CacheChunkRequest*> CacheChunkRequests, OnCacheValueGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::Zen::GetCacheValues"); @@ -856,12 +899,16 @@ namespace detail { CbObjectWriter BatchRequest; BatchRequest << "Method"sv - << "GetCacheChunks"; + << "GetCacheChunks"sv; + BatchRequest << "Namespace"sv << Namespace; BatchRequest.BeginObject("Params"sv); { CachePolicy DefaultPolicy = CacheChunkRequests[0]->Policy; BatchRequest << "DefaultPolicy"sv << WriteToString<128>(DefaultPolicy).ToView(); + + BatchRequest << "Namespace"sv << Namespace; + BatchRequest.BeginArray("ChunkRequests"sv); { for (CacheChunkRequest* RequestPtr : CacheChunkRequests) @@ -1010,7 +1057,11 @@ namespace detail { for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { - Result = Session.PutCacheRecord(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, PackagePayload, CacheRecord.Type); + Result = Session.PutCacheRecord(CacheRecord.Namespace, + CacheRecord.Key.Bucket, + CacheRecord.Key.Hash, + PackagePayload, + CacheRecord.Type); } m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -1029,12 +1080,14 @@ namespace detail { CbPackage BatchPackage; CbObjectWriter BatchWriter; BatchWriter << "Method"sv - << "PutCacheValues"; + << "PutCacheValues"sv; BatchWriter.BeginObject("Params"sv); { // DefaultPolicy unspecified and expected to be Default + BatchWriter << "Namespace"sv << CacheRecord.Namespace; + BatchWriter.BeginArray("Requests"sv); { BatchWriter.BeginObject(); @@ -1075,7 +1128,8 @@ namespace detail { Result.Success = false; for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { - Result = Session.PutCacheValue(CacheRecord.Key.Bucket, + Result = Session.PutCacheValue(CacheRecord.Namespace, + CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheRecord.ValueContentIds[Idx], Values[Idx]); @@ -1098,7 +1152,11 @@ namespace detail { Result.Success = false; for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { - Result = Session.PutCacheRecord(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue, CacheRecord.Type); + Result = Session.PutCacheRecord(CacheRecord.Namespace, + CacheRecord.Key.Bucket, + CacheRecord.Key.Hash, + RecordValue, + CacheRecord.Type); } m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -1226,7 +1284,7 @@ public: } } - virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override + virtual GetUpstreamCacheResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) override { ZEN_TRACE_CPU("Upstream::GetCacheRecord"); @@ -1245,7 +1303,7 @@ public: GetUpstreamCacheResult Result; { metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); - Result = Endpoint->GetCacheRecord(CacheKey, Type); + Result = Endpoint->GetCacheRecord(Namespace, CacheKey, Type); } Stats.CacheGetCount.Increment(1); @@ -1273,7 +1331,9 @@ public: return {}; } - virtual void GetCacheRecords(std::span<CacheKeyRequest*> Requests, OnCacheRecordGetComplete&& OnComplete) override final + virtual void GetCacheRecords(std::string_view Namespace, + std::span<CacheKeyRequest*> Requests, + OnCacheRecordGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::GetCacheRecords"); @@ -1301,7 +1361,7 @@ public: { metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); - Result = Endpoint->GetCacheRecords(RemainingKeys, [&](CacheRecordGetCompleteParams&& Params) { + Result = Endpoint->GetCacheRecords(Namespace, RemainingKeys, [&](CacheRecordGetCompleteParams&& Params) { if (Params.Record) { OnComplete(std::forward<CacheRecordGetCompleteParams>(Params)); @@ -1338,7 +1398,9 @@ public: } } - virtual void GetCacheValues(std::span<CacheChunkRequest*> CacheChunkRequests, OnCacheValueGetComplete&& OnComplete) override final + virtual void GetCacheValues(std::string_view Namespace, + std::span<CacheChunkRequest*> CacheChunkRequests, + OnCacheValueGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::GetCacheValues"); @@ -1366,7 +1428,7 @@ public: { metrics::OperationTiming::Scope Scope(Endpoint->Stats().CacheGetRequestTiming); - Result = Endpoint->GetCacheValues(RemainingKeys, [&](CacheValueGetCompleteParams&& Params) { + Result = Endpoint->GetCacheValues(Namespace, RemainingKeys, [&](CacheValueGetCompleteParams&& Params) { if (Params.RawHash != Params.RawHash.Zero) { OnComplete(std::forward<CacheValueGetCompleteParams>(Params)); @@ -1403,7 +1465,9 @@ public: } } - virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) override + virtual GetUpstreamCacheResult GetCacheValue(std::string_view Namespace, + const CacheKey& CacheKey, + const IoHash& ValueContentId) override { ZEN_TRACE_CPU("Upstream::GetCacheValue"); @@ -1421,7 +1485,7 @@ public: { metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); - Result = Endpoint->GetCacheValue(CacheKey, ValueContentId); + Result = Endpoint->GetCacheValue(Namespace, CacheKey, ValueContentId); } Stats.CacheGetCount.Increment(1); @@ -1451,7 +1515,7 @@ public: virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) override { - if (m_RunState.IsRunning && m_Options.WriteUpstream) + if (m_RunState.IsRunning && m_Options.WriteUpstream && m_Endpoints.size() > 0) { if (!m_UpstreamThreads.empty()) { @@ -1517,7 +1581,7 @@ private: ZenCacheValue CacheValue; std::vector<IoBuffer> Payloads; - if (!m_CacheStore.Get(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheValue)) + if (!m_CacheStore.Get(CacheRecord.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheValue)) { ZEN_WARN("process upstream FAILED, '{}/{}', cache record doesn't exist", CacheRecord.Key.Bucket, CacheRecord.Key.Hash); return; @@ -1531,7 +1595,8 @@ private: } else { - ZEN_WARN("process upstream FAILED, '{}/{}/{}', ValueContentId doesn't exist in CAS", + ZEN_WARN("process upstream FAILED, '{}/{}/{}/{}', ValueContentId doesn't exist in CAS", + CacheRecord.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, ValueContentId); diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h index 6f18b3119..13548efc8 100644 --- a/zenserver/upstream/upstreamcache.h +++ b/zenserver/upstream/upstreamcache.h @@ -32,6 +32,7 @@ struct ZenStructuredCacheClientOptions; struct UpstreamCacheRecord { ZenContentType Type = ZenContentType::kBinary; + std::string Namespace; CacheKey Key; std::vector<IoHash> ValueContentIds; }; @@ -163,12 +164,15 @@ public: virtual UpstreamEndpointState GetState() = 0; virtual UpstreamEndpointStatus GetStatus() = 0; - virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) = 0; - virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKeyRequest*> Requests, OnCacheRecordGetComplete&& OnComplete) = 0; + virtual GetUpstreamCacheResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) = 0; + virtual GetUpstreamCacheResult GetCacheRecords(std::string_view Namespace, + std::span<CacheKeyRequest*> Requests, + OnCacheRecordGetComplete&& OnComplete) = 0; - virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& PayloadId) = 0; - virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest*> CacheChunkRequests, - OnCacheValueGetComplete&& OnComplete) = 0; + virtual GetUpstreamCacheResult GetCacheValue(std::string_view Namespace, const CacheKey& CacheKey, const IoHash& PayloadId) = 0; + virtual GetUpstreamCacheResult GetCacheValues(std::string_view Namespace, + std::span<CacheChunkRequest*> CacheChunkRequests, + OnCacheValueGetComplete&& OnComplete) = 0; virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, IoBuffer RecordValue, @@ -196,11 +200,15 @@ public: virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) = 0; virtual void IterateEndpoints(std::function<bool(UpstreamEndpoint&)>&& Fn) = 0; - virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) = 0; - virtual void GetCacheRecords(std::span<CacheKeyRequest*> Requests, OnCacheRecordGetComplete&& OnComplete) = 0; + virtual GetUpstreamCacheResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) = 0; + virtual void GetCacheRecords(std::string_view Namespace, + std::span<CacheKeyRequest*> Requests, + OnCacheRecordGetComplete&& OnComplete) = 0; - virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) = 0; - virtual void GetCacheValues(std::span<CacheChunkRequest*> CacheChunkRequests, OnCacheValueGetComplete&& OnComplete) = 0; + virtual GetUpstreamCacheResult GetCacheValue(std::string_view Namespace, const CacheKey& CacheKey, const IoHash& ValueContentId) = 0; + virtual void GetCacheValues(std::string_view Namespace, + std::span<CacheChunkRequest*> CacheChunkRequests, + OnCacheValueGetComplete&& OnComplete) = 0; virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0; diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp index 1ac4afe5c..efc75b5b4 100644 --- a/zenserver/upstream/zen.cpp +++ b/zenserver/upstream/zen.cpp @@ -408,10 +408,15 @@ ZenStructuredCacheSession::CheckHealth() } ZenCacheResult -ZenStructuredCacheSession::GetCacheRecord(std::string_view BucketId, const IoHash& Key, ZenContentType Type) +ZenStructuredCacheSession::GetCacheRecord(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType Type) { ExtendableStringBuilder<256> Uri; - Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString(); + Uri << m_Client.ServiceUrl() << "/z$/"; + if (Namespace != ZenCacheStore::DefaultNamespace) + { + Uri << Namespace << "/"; + } + Uri << BucketId << "/" << Key.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); @@ -432,10 +437,18 @@ ZenStructuredCacheSession::GetCacheRecord(std::string_view BucketId, const IoHas } ZenCacheResult -ZenStructuredCacheSession::GetCacheValue(std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId) +ZenStructuredCacheSession::GetCacheValue(std::string_view Namespace, + std::string_view BucketId, + const IoHash& Key, + const IoHash& ValueContentId) { ExtendableStringBuilder<256> Uri; - Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString(); + Uri << m_Client.ServiceUrl() << "/z$/"; + if (Namespace != ZenCacheStore::DefaultNamespace) + { + Uri << Namespace << "/"; + } + Uri << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); @@ -457,10 +470,19 @@ ZenStructuredCacheSession::GetCacheValue(std::string_view BucketId, const IoHash } ZenCacheResult -ZenStructuredCacheSession::PutCacheRecord(std::string_view BucketId, const IoHash& Key, IoBuffer Value, ZenContentType Type) +ZenStructuredCacheSession::PutCacheRecord(std::string_view Namespace, + std::string_view BucketId, + const IoHash& Key, + IoBuffer Value, + ZenContentType Type) { ExtendableStringBuilder<256> Uri; - Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString(); + Uri << m_Client.ServiceUrl() << "/z$/"; + if (Namespace != ZenCacheStore::DefaultNamespace) + { + Uri << Namespace << "/"; + } + Uri << BucketId << "/" << Key.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); @@ -485,10 +507,19 @@ ZenStructuredCacheSession::PutCacheRecord(std::string_view BucketId, const IoHas } ZenCacheResult -ZenStructuredCacheSession::PutCacheValue(std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId, IoBuffer Payload) +ZenStructuredCacheSession::PutCacheValue(std::string_view Namespace, + std::string_view BucketId, + const IoHash& Key, + const IoHash& ValueContentId, + IoBuffer Payload) { ExtendableStringBuilder<256> Uri; - Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString(); + Uri << m_Client.ServiceUrl() << "/z$/"; + if (Namespace != ZenCacheStore::DefaultNamespace) + { + Uri << Namespace << "/"; + } + Uri << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h index f70d9d06f..e8590f940 100644 --- a/zenserver/upstream/zen.h +++ b/zenserver/upstream/zen.h @@ -128,10 +128,18 @@ public: ~ZenStructuredCacheSession(); ZenCacheResult CheckHealth(); - ZenCacheResult GetCacheRecord(std::string_view BucketId, const IoHash& Key, ZenContentType Type); - ZenCacheResult GetCacheValue(std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId); - ZenCacheResult PutCacheRecord(std::string_view BucketId, const IoHash& Key, IoBuffer Value, ZenContentType Type); - ZenCacheResult PutCacheValue(std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId, IoBuffer Payload); + ZenCacheResult GetCacheRecord(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType Type); + ZenCacheResult GetCacheValue(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId); + ZenCacheResult PutCacheRecord(std::string_view Namespace, + std::string_view BucketId, + const IoHash& Key, + IoBuffer Value, + ZenContentType Type); + ZenCacheResult PutCacheValue(std::string_view Namespace, + std::string_view BucketId, + const IoHash& Key, + const IoHash& ValueContentId, + IoBuffer Payload); ZenCacheResult InvokeRpc(const CbObjectView& Request); ZenCacheResult InvokeRpc(const CbPackage& Package); diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index b0598d75b..53a046f80 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -1154,6 +1154,7 @@ test_main(int argc, char** argv) zen::zenhttp_forcelinktests(); zen::zenstore_forcelinktests(); zen::z$_forcelink(); + zen::z$service_forcelink(); zen::logging::InitializeLogging(); spdlog::set_level(spdlog::level::debug); |