From db2bee546354a03ca9a94a647161b041b3033491 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Thu, 28 Apr 2022 15:32:55 +0200 Subject: Reduce risk of reallocating backing std::vector in CbWriter::AddBinary Shard up g_MappingLock in IoBufferExtendedCore::Materialize() to reduce contention during high load Don't queue upstream cache records if we don't have any upstreams --- zenserver/upstream/upstreamcache.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'zenserver/upstream/upstreamcache.cpp') diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index da0743f0a..dba80faa9 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -1451,7 +1451,7 @@ public: virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) override { - if (m_RunState.IsRunning && m_Options.WriteUpstream) + if (m_RunState.IsRunning && m_Options.WriteUpstream && m_Endpoints.size() > 0) { if (!m_UpstreamThreads.empty()) { -- cgit v1.2.3 From 5b95a4fba97aa66cec935ef3e0d969893223f9d6 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Wed, 4 May 2022 15:25:35 +0200 Subject: Add namespacecachestore layer to allow multiple structured cache namespaces --- zenserver/upstream/upstreamcache.cpp | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) (limited to 'zenserver/upstream/upstreamcache.cpp') diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index dba80faa9..49f384774 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -19,6 +19,7 @@ #include #include +#include "cache/namespacecachestore.h" #include "cache/structuredcache.h" #include "cache/structuredcachestore.h" #include "diag/logging.h" @@ -1173,7 +1174,7 @@ namespace detail { class UpstreamCacheImpl final : public UpstreamCache { public: - UpstreamCacheImpl(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore) + UpstreamCacheImpl(const UpstreamCacheOptions& Options, NamespaceCacheStore& CacheStore, CidStore& CidStore) : m_Log(logging::Get("upstream")) , m_Options(Options) , m_CacheStore(CacheStore) @@ -1517,7 +1518,7 @@ private: ZenCacheValue CacheValue; std::vector Payloads; - if (!m_CacheStore.Get(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheValue)) + if (!m_CacheStore.Get("", CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheValue)) { ZEN_WARN("process upstream FAILED, '{}/{}', cache record doesn't exist", CacheRecord.Key.Bucket, CacheRecord.Key.Hash); return; @@ -1687,7 +1688,7 @@ private: spdlog::logger& m_Log; UpstreamCacheOptions m_Options; - ZenCacheStore& m_CacheStore; + NamespaceCacheStore& m_CacheStore; CidStore& m_CidStore; UpstreamQueue m_UpstreamQueue; std::shared_mutex m_EndpointsMutex; @@ -1712,7 +1713,7 @@ UpstreamEndpoint::CreateJupiterEndpoint(const CloudCacheClientOptions& Options, } std::unique_ptr -UpstreamCache::Create(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore) +UpstreamCache::Create(const UpstreamCacheOptions& Options, NamespaceCacheStore& CacheStore, CidStore& CidStore) { return std::make_unique(Options, CacheStore, CidStore); } -- cgit v1.2.3 From ef12415d287c9307c0c4774aeacff6c91966f693 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Wed, 4 May 2022 19:45:57 +0200 Subject: cleanup --- zenserver/upstream/upstreamcache.cpp | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) (limited to 'zenserver/upstream/upstreamcache.cpp') diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 49f384774..c89227106 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -19,7 +19,6 @@ #include #include -#include "cache/namespacecachestore.h" #include "cache/structuredcache.h" #include "cache/structuredcachestore.h" #include "diag/logging.h" @@ -1174,7 +1173,7 @@ namespace detail { class UpstreamCacheImpl final : public UpstreamCache { public: - UpstreamCacheImpl(const UpstreamCacheOptions& Options, NamespaceCacheStore& CacheStore, CidStore& CidStore) + UpstreamCacheImpl(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore) : m_Log(logging::Get("upstream")) , m_Options(Options) , m_CacheStore(CacheStore) @@ -1688,7 +1687,7 @@ private: spdlog::logger& m_Log; UpstreamCacheOptions m_Options; - NamespaceCacheStore& m_CacheStore; + ZenCacheStore& m_CacheStore; CidStore& m_CidStore; UpstreamQueue m_UpstreamQueue; std::shared_mutex m_EndpointsMutex; @@ -1713,7 +1712,7 @@ UpstreamEndpoint::CreateJupiterEndpoint(const CloudCacheClientOptions& Options, } std::unique_ptr -UpstreamCache::Create(const UpstreamCacheOptions& Options, NamespaceCacheStore& CacheStore, CidStore& CidStore) +UpstreamCache::Create(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore) { return std::make_unique(Options, CacheStore, CidStore); } -- cgit v1.2.3 From 861a92d1ee6c54eeb9035190501baf8ea888591f Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Thu, 5 May 2022 09:55:09 +0200 Subject: cleanup and review feedback --- zenserver/upstream/upstreamcache.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'zenserver/upstream/upstreamcache.cpp') diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index c89227106..c870e0773 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -1517,7 +1517,7 @@ private: ZenCacheValue CacheValue; std::vector Payloads; - if (!m_CacheStore.Get("", CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheValue)) + if (!m_CacheStore.Get(ZenCacheStore::DefaultNamespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheValue)) { ZEN_WARN("process upstream FAILED, '{}/{}', cache record doesn't exist", CacheRecord.Key.Bucket, CacheRecord.Key.Hash); return; -- cgit v1.2.3 From 33fa76a35a96cad1865854068e60c1ca0b53864e Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Fri, 6 May 2022 16:42:27 +0200 Subject: parameterize namespace for upstream (first hack) --- zenserver/upstream/upstreamcache.cpp | 66 ++++++++++++++++++++++++++---------- 1 file changed, 48 insertions(+), 18 deletions(-) (limited to 'zenserver/upstream/upstreamcache.cpp') diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index c870e0773..2b52b8efa 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -175,11 +175,15 @@ namespace detail { if (m_UseLegacyDdc && Type == ZenContentType::kBinary) { - Result = Session.GetDerivedData(CacheKey.Bucket, CacheKey.Hash); + Result = + Session.GetDerivedData(CacheKey.Namespace, CacheKey.Bucket, CacheKey.Hash); // Session.Client().DdcNamespace(), } else if (Type == ZenContentType::kCompressedBinary) { - Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject); + Result = Session.GetRef(CacheKey.Namespace, + CacheKey.Bucket, + CacheKey.Hash, + ZenContentType::kCbObject); // Session.Client().BlobStoreNamespace(), if (Result.Success) { @@ -192,7 +196,8 @@ namespace detail { CacheRecord.IterateAttachments( [&Session, &Result, &ContentBuffer, &NumAttachments](CbFieldView AttachmentHash) { - CloudCacheResult AttachmentResult = Session.GetCompressedBlob(AttachmentHash.AsHash()); + CloudCacheResult AttachmentResult = + Session.GetCompressedBlob(Session.Client().BlobStoreNamespace(), AttachmentHash.AsHash()); Result.Bytes += AttachmentResult.Bytes; Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds; Result.ErrorCode = AttachmentResult.ErrorCode; @@ -217,7 +222,10 @@ namespace detail { else { const ZenContentType AcceptType = Type == ZenContentType::kCbPackage ? ZenContentType::kCbObject : Type; - Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, AcceptType); + Result = Session.GetRef(CacheKey.Namespace, + CacheKey.Bucket, + CacheKey.Hash, + AcceptType); // Session.Client().BlobStoreNamespace() if (Result.Success && Type == ZenContentType::kCbPackage) { @@ -229,7 +237,8 @@ namespace detail { CbObject CacheRecord = LoadCompactBinaryObject(Result.Response); CacheRecord.IterateAttachments([&Session, &Result, &Package](CbFieldView AttachmentHash) { - CloudCacheResult AttachmentResult = Session.GetCompressedBlob(AttachmentHash.AsHash()); + CloudCacheResult AttachmentResult = + Session.GetCompressedBlob(Session.Client().BlobStoreNamespace(), AttachmentHash.AsHash()); Result.Bytes += AttachmentResult.Bytes; Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds; Result.ErrorCode = AttachmentResult.ErrorCode; @@ -294,7 +303,10 @@ namespace detail { if (!Result.Error) { - CloudCacheResult RefResult = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject); + CloudCacheResult RefResult = Session.GetRef(CacheKey.Namespace, + CacheKey.Bucket, + CacheKey.Hash, + ZenContentType::kCbObject); // Session.Client().BlobStoreNamespace() AppendResult(RefResult, Result); m_Status.SetFromErrorCode(RefResult.ErrorCode, RefResult.Reason); @@ -306,7 +318,8 @@ namespace detail { { Record = LoadCompactBinaryObject(RefResult.Response); Record.IterateAttachments([this, &Session, &Result, &Package](CbFieldView AttachmentHash) { - CloudCacheResult BlobResult = Session.GetCompressedBlob(AttachmentHash.AsHash()); + CloudCacheResult BlobResult = + Session.GetCompressedBlob(Session.Client().BlobStoreNamespace(), AttachmentHash.AsHash()); AppendResult(BlobResult, Result); m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); @@ -336,7 +349,7 @@ namespace detail { try { CloudCacheSession Session(m_Client); - const CloudCacheResult Result = Session.GetCompressedBlob(ValueContentId); + const CloudCacheResult Result = Session.GetCompressedBlob(Session.Client().BlobStoreNamespace(), ValueContentId); m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -376,7 +389,7 @@ namespace detail { CompressedBuffer Compressed; if (!Result.Error) { - const CloudCacheResult BlobResult = Session.GetCompressedBlob(Request.ChunkId); + const CloudCacheResult BlobResult = Session.GetCompressedBlob(Session.Client().BlobStoreNamespace(), Request.ChunkId); Payload = BlobResult.Response; AppendResult(BlobResult, Result); @@ -424,11 +437,18 @@ namespace detail { { if (m_UseLegacyDdc) { - Result = Session.PutDerivedData(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue); + Result = Session.PutDerivedData(CacheRecord.Key.Namespace, // Session.Client().BlobStoreNamespace(), + CacheRecord.Key.Bucket, + CacheRecord.Key.Hash, + RecordValue); } else { - Result = Session.PutRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue, ZenContentType::kBinary); + Result = Session.PutRef(CacheRecord.Key.Namespace, // Session.Client().BlobStoreNamespace(), + CacheRecord.Key.Bucket, + CacheRecord.Key.Hash, + RecordValue, + ZenContentType::kBinary); } } @@ -539,7 +559,7 @@ namespace detail { CloudCacheResult BlobResult; for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++) { - BlobResult = Session.PutCompressedBlob(ValueContentId, BlobBuffer); + BlobResult = Session.PutCompressedBlob(Session.Client().BlobStoreNamespace(), ValueContentId, BlobBuffer); } m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); @@ -560,7 +580,11 @@ namespace detail { PutRefResult RefResult; for (int32_t Attempt = 0; Attempt < MaxAttempts && !RefResult.Success; Attempt++) { - RefResult = Session.PutRef(Key.Bucket, Key.Hash, ObjectBuffer, ZenContentType::kCbObject); + RefResult = Session.PutRef(Key.Namespace, // Seesion.Client().BlobStoreNamespace(), + Key.Bucket, + Key.Hash, + ObjectBuffer, + ZenContentType::kCbObject); } m_Status.SetFromErrorCode(RefResult.ErrorCode, RefResult.Reason); @@ -580,8 +604,9 @@ namespace detail { return {.Reason = std::move(Reason), .Success = false}; } - const IoHash RefHash = IoHash::HashBuffer(ObjectBuffer); - FinalizeRefResult FinalizeResult = Session.FinalizeRef(Key.Bucket, Key.Hash, RefHash); + const IoHash RefHash = IoHash::HashBuffer(ObjectBuffer); + FinalizeRefResult FinalizeResult = + Session.FinalizeRef(Key.Namespace, Key.Bucket, Key.Hash, RefHash); // Session.Client().BlobStoreNamespace(), m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason); @@ -599,7 +624,8 @@ namespace detail { return {.Reason = std::move(Reason), .Success = false}; } - FinalizeResult = Session.FinalizeRef(Key.Bucket, Key.Hash, RefHash); + FinalizeResult = + Session.FinalizeRef(Key.Namespace, Key.Bucket, Key.Hash, RefHash); // Session.Client().BlobStoreNamespace(), m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason); @@ -761,6 +787,7 @@ namespace detail { const CacheKey& Key = Request->Key; BatchRequest.BeginObject("Key"sv); { + BatchRequest << "Namespace"sv << Key.Namespace; BatchRequest << "Bucket"sv << Key.Bucket; BatchRequest << "Hash"sv << Key.Hash; } @@ -871,6 +898,7 @@ namespace detail { BatchRequest.BeginObject(); { BatchRequest.BeginObject("Key"sv); + BatchRequest << "Namespace"sv << Request.Key.Namespace; BatchRequest << "Bucket"sv << Request.Key.Bucket; BatchRequest << "Hash"sv << Request.Key.Hash; BatchRequest.EndObject(); @@ -1042,6 +1070,7 @@ namespace detail { const CacheKey& Key = CacheRecord.Key; BatchWriter.BeginObject("Key"sv); { + BatchWriter << "Namespace"sv << Key.Namespace; BatchWriter << "Bucket"sv << Key.Bucket; BatchWriter << "Hash"sv << Key.Hash; } @@ -1517,7 +1546,7 @@ private: ZenCacheValue CacheValue; std::vector Payloads; - if (!m_CacheStore.Get(ZenCacheStore::DefaultNamespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheValue)) + if (!m_CacheStore.Get(CacheRecord.Key.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheValue)) { ZEN_WARN("process upstream FAILED, '{}/{}', cache record doesn't exist", CacheRecord.Key.Bucket, CacheRecord.Key.Hash); return; @@ -1531,7 +1560,8 @@ private: } else { - ZEN_WARN("process upstream FAILED, '{}/{}/{}', ValueContentId doesn't exist in CAS", + ZEN_WARN("process upstream FAILED, '{}/{}/{}/{}', ValueContentId doesn't exist in CAS", + CacheRecord.Key.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, ValueContentId); -- cgit v1.2.3 From 41aa73ae51fd251969c6abf9ffafa5b40aad22e6 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Tue, 10 May 2022 09:28:38 +0200 Subject: Use configured namespace in Jupiter if not explicit namespace is given DdcNamespace -> DefaultDdcNamespace BlobStoreNamespace -> DefaultBlobStoreNamespace --- zenserver/upstream/upstreamcache.cpp | 120 ++++++++++++++++++----------------- 1 file changed, 62 insertions(+), 58 deletions(-) (limited to 'zenserver/upstream/upstreamcache.cpp') diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 2b52b8efa..cbb32b13e 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -160,6 +160,24 @@ namespace detail { } } + std::string_view GetActualDdcNamespace(CloudCacheSession& Session, const std::string Namespace) + { + if (Namespace == ZenCacheStore::DefaultNamespace) + { + return Session.Client().DefaultDdcNamespace(); + } + return Namespace; + } + + std::string_view GetActualBlobStoreNamespace(CloudCacheSession& Session, const std::string Namespace) + { + if (Namespace == ZenCacheStore::DefaultNamespace) + { + return Session.Client().DefaultBlobStoreNamespace(); + } + return Namespace; + } + virtual UpstreamEndpointState GetState() override { return m_Status.EndpointState(); } virtual UpstreamEndpointStatus GetStatus() override { return m_Status.EndpointStatus(); } @@ -173,17 +191,16 @@ namespace detail { CloudCacheSession Session(m_Client); CloudCacheResult Result; + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheKey.Namespace); + if (m_UseLegacyDdc && Type == ZenContentType::kBinary) { - Result = - Session.GetDerivedData(CacheKey.Namespace, CacheKey.Bucket, CacheKey.Hash); // Session.Client().DdcNamespace(), + std::string_view DdcNamespace = GetActualDdcNamespace(Session, CacheKey.Namespace); + Result = Session.GetDerivedData(DdcNamespace, CacheKey.Bucket, CacheKey.Hash); } else if (Type == ZenContentType::kCompressedBinary) { - Result = Session.GetRef(CacheKey.Namespace, - CacheKey.Bucket, - CacheKey.Hash, - ZenContentType::kCbObject); // Session.Client().BlobStoreNamespace(), + Result = Session.GetRef(BlobStoreNamespace, CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject); if (Result.Success) { @@ -194,24 +211,22 @@ namespace detail { IoBuffer ContentBuffer; int NumAttachments = 0; - CacheRecord.IterateAttachments( - [&Session, &Result, &ContentBuffer, &NumAttachments](CbFieldView AttachmentHash) { - CloudCacheResult AttachmentResult = - Session.GetCompressedBlob(Session.Client().BlobStoreNamespace(), AttachmentHash.AsHash()); - Result.Bytes += AttachmentResult.Bytes; - Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds; - Result.ErrorCode = AttachmentResult.ErrorCode; + CacheRecord.IterateAttachments([&](CbFieldView AttachmentHash) { + CloudCacheResult AttachmentResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash()); + Result.Bytes += AttachmentResult.Bytes; + Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds; + Result.ErrorCode = AttachmentResult.ErrorCode; - if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response))) - { - Result.Response = AttachmentResult.Response; - ++NumAttachments; - } - else - { - Result.Success = false; - } - }); + if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response))) + { + Result.Response = AttachmentResult.Response; + ++NumAttachments; + } + else + { + Result.Success = false; + } + }); if (NumAttachments != 1) { Result.Success = false; @@ -222,10 +237,7 @@ namespace detail { else { const ZenContentType AcceptType = Type == ZenContentType::kCbPackage ? ZenContentType::kCbObject : Type; - Result = Session.GetRef(CacheKey.Namespace, - CacheKey.Bucket, - CacheKey.Hash, - AcceptType); // Session.Client().BlobStoreNamespace() + Result = Session.GetRef(BlobStoreNamespace, CacheKey.Bucket, CacheKey.Hash, AcceptType); if (Result.Success && Type == ZenContentType::kCbPackage) { @@ -236,9 +248,8 @@ namespace detail { { CbObject CacheRecord = LoadCompactBinaryObject(Result.Response); - CacheRecord.IterateAttachments([&Session, &Result, &Package](CbFieldView AttachmentHash) { - CloudCacheResult AttachmentResult = - Session.GetCompressedBlob(Session.Client().BlobStoreNamespace(), AttachmentHash.AsHash()); + CacheRecord.IterateAttachments([&](CbFieldView AttachmentHash) { + CloudCacheResult AttachmentResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash()); Result.Bytes += AttachmentResult.Bytes; Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds; Result.ErrorCode = AttachmentResult.ErrorCode; @@ -303,10 +314,9 @@ namespace detail { if (!Result.Error) { - CloudCacheResult RefResult = Session.GetRef(CacheKey.Namespace, - CacheKey.Bucket, - CacheKey.Hash, - ZenContentType::kCbObject); // Session.Client().BlobStoreNamespace() + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheKey.Namespace); + CloudCacheResult RefResult = + Session.GetRef(BlobStoreNamespace, CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject); AppendResult(RefResult, Result); m_Status.SetFromErrorCode(RefResult.ErrorCode, RefResult.Reason); @@ -317,9 +327,8 @@ namespace detail { if (ValidationResult == CbValidateError::None) { Record = LoadCompactBinaryObject(RefResult.Response); - Record.IterateAttachments([this, &Session, &Result, &Package](CbFieldView AttachmentHash) { - CloudCacheResult BlobResult = - Session.GetCompressedBlob(Session.Client().BlobStoreNamespace(), AttachmentHash.AsHash()); + Record.IterateAttachments([&](CbFieldView AttachmentHash) { + CloudCacheResult BlobResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash()); AppendResult(BlobResult, Result); m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); @@ -342,14 +351,15 @@ namespace detail { return Result; } - virtual GetUpstreamCacheResult GetCacheValue(const CacheKey&, const IoHash& ValueContentId) override + virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) override { ZEN_TRACE_CPU("Upstream::Horde::GetSingleCacheValue"); try { CloudCacheSession Session(m_Client); - const CloudCacheResult Result = Session.GetCompressedBlob(Session.Client().BlobStoreNamespace(), ValueContentId); + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheKey.Namespace); + const CloudCacheResult Result = Session.GetCompressedBlob(BlobStoreNamespace, ValueContentId); m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -389,8 +399,9 @@ namespace detail { CompressedBuffer Compressed; if (!Result.Error) { - const CloudCacheResult BlobResult = Session.GetCompressedBlob(Session.Client().BlobStoreNamespace(), Request.ChunkId); - Payload = BlobResult.Response; + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Request.Key.Namespace); + const CloudCacheResult BlobResult = Session.GetCompressedBlob(BlobStoreNamespace, Request.ChunkId); + Payload = BlobResult.Response; AppendResult(BlobResult, Result); @@ -435,16 +446,14 @@ namespace detail { CloudCacheResult Result; for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheRecord.Key.Namespace); if (m_UseLegacyDdc) { - Result = Session.PutDerivedData(CacheRecord.Key.Namespace, // Session.Client().BlobStoreNamespace(), - CacheRecord.Key.Bucket, - CacheRecord.Key.Hash, - RecordValue); + Result = Session.PutDerivedData(BlobStoreNamespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue); } else { - Result = Session.PutRef(CacheRecord.Key.Namespace, // Session.Client().BlobStoreNamespace(), + Result = Session.PutRef(BlobStoreNamespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue, @@ -547,7 +556,8 @@ namespace detail { int64_t TotalBytes = 0ull; double TotalElapsedSeconds = 0.0; - const auto PutBlobs = [&](std::span ValueContentIds, std::string& OutReason) -> bool { + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Key.Namespace); + const auto PutBlobs = [&](std::span ValueContentIds, std::string& OutReason) -> bool { for (const IoHash& ValueContentId : ValueContentIds) { IoBuffer BlobBuffer; @@ -559,7 +569,7 @@ namespace detail { CloudCacheResult BlobResult; for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++) { - BlobResult = Session.PutCompressedBlob(Session.Client().BlobStoreNamespace(), ValueContentId, BlobBuffer); + BlobResult = Session.PutCompressedBlob(BlobStoreNamespace, ValueContentId, BlobBuffer); } m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); @@ -580,11 +590,7 @@ namespace detail { PutRefResult RefResult; for (int32_t Attempt = 0; Attempt < MaxAttempts && !RefResult.Success; Attempt++) { - RefResult = Session.PutRef(Key.Namespace, // Seesion.Client().BlobStoreNamespace(), - Key.Bucket, - Key.Hash, - ObjectBuffer, - ZenContentType::kCbObject); + RefResult = Session.PutRef(BlobStoreNamespace, Key.Bucket, Key.Hash, ObjectBuffer, ZenContentType::kCbObject); } m_Status.SetFromErrorCode(RefResult.ErrorCode, RefResult.Reason); @@ -604,9 +610,8 @@ namespace detail { return {.Reason = std::move(Reason), .Success = false}; } - const IoHash RefHash = IoHash::HashBuffer(ObjectBuffer); - FinalizeRefResult FinalizeResult = - Session.FinalizeRef(Key.Namespace, Key.Bucket, Key.Hash, RefHash); // Session.Client().BlobStoreNamespace(), + const IoHash RefHash = IoHash::HashBuffer(ObjectBuffer); + FinalizeRefResult FinalizeResult = Session.FinalizeRef(BlobStoreNamespace, Key.Bucket, Key.Hash, RefHash); m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason); @@ -624,8 +629,7 @@ namespace detail { return {.Reason = std::move(Reason), .Success = false}; } - FinalizeResult = - Session.FinalizeRef(Key.Namespace, Key.Bucket, Key.Hash, RefHash); // Session.Client().BlobStoreNamespace(), + FinalizeResult = Session.FinalizeRef(BlobStoreNamespace, Key.Bucket, Key.Hash, RefHash); m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason); -- cgit v1.2.3 From d418b9794dddb550bf90c55a4353f2ed1a764168 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Thu, 12 May 2022 10:43:41 +0200 Subject: string_view vs string lifetime fix --- zenserver/upstream/upstreamcache.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'zenserver/upstream/upstreamcache.cpp') diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index cbb32b13e..52513abe9 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -160,7 +160,7 @@ namespace detail { } } - std::string_view GetActualDdcNamespace(CloudCacheSession& Session, const std::string Namespace) + std::string_view GetActualDdcNamespace(CloudCacheSession& Session, std::string_view Namespace) { if (Namespace == ZenCacheStore::DefaultNamespace) { @@ -169,7 +169,7 @@ namespace detail { return Namespace; } - std::string_view GetActualBlobStoreNamespace(CloudCacheSession& Session, const std::string Namespace) + std::string_view GetActualBlobStoreNamespace(CloudCacheSession& Session, std::string_view Namespace) { if (Namespace == ZenCacheStore::DefaultNamespace) { -- cgit v1.2.3 From a9130d34b5318b0da5d3547c432a8734213fbe9b Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Thu, 19 May 2022 11:37:25 +0200 Subject: Keep Namespace out of CacheKey and store it on request level RPC requests now has a Namespace field under Params instead of one Namespace per cache key Fall back to legacy upstream HTTP URI format if default namespace is requested --- zenserver/upstream/upstreamcache.cpp | 103 +++++++++++++++++++++++------------ 1 file changed, 67 insertions(+), 36 deletions(-) (limited to 'zenserver/upstream/upstreamcache.cpp') diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 52513abe9..98b4439c7 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -182,7 +182,7 @@ namespace detail { virtual UpstreamEndpointStatus GetStatus() override { return m_Status.EndpointStatus(); } - virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override + virtual GetUpstreamCacheResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) override { ZEN_TRACE_CPU("Upstream::Horde::GetSingleCacheRecord"); @@ -191,11 +191,11 @@ namespace detail { CloudCacheSession Session(m_Client); CloudCacheResult Result; - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheKey.Namespace); + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); if (m_UseLegacyDdc && Type == ZenContentType::kBinary) { - std::string_view DdcNamespace = GetActualDdcNamespace(Session, CacheKey.Namespace); + std::string_view DdcNamespace = GetActualDdcNamespace(Session, Namespace); Result = Session.GetDerivedData(DdcNamespace, CacheKey.Bucket, CacheKey.Hash); } else if (Type == ZenContentType::kCompressedBinary) @@ -299,7 +299,9 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCacheRecords(std::span Requests, OnCacheRecordGetComplete&& OnComplete) override + virtual GetUpstreamCacheResult GetCacheRecords(std::string_view Namespace, + std::span Requests, + OnCacheRecordGetComplete&& OnComplete) override { ZEN_TRACE_CPU("Upstream::Horde::GetCacheRecords"); @@ -314,7 +316,7 @@ namespace detail { if (!Result.Error) { - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheKey.Namespace); + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); CloudCacheResult RefResult = Session.GetRef(BlobStoreNamespace, CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject); AppendResult(RefResult, Result); @@ -351,14 +353,14 @@ namespace detail { return Result; } - virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) override + virtual GetUpstreamCacheResult GetCacheValue(std::string_view Namespace, const CacheKey&, const IoHash& ValueContentId) override { ZEN_TRACE_CPU("Upstream::Horde::GetSingleCacheValue"); try { CloudCacheSession Session(m_Client); - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheKey.Namespace); + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); const CloudCacheResult Result = Session.GetCompressedBlob(BlobStoreNamespace, ValueContentId); m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -383,7 +385,8 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCacheValues(std::span CacheChunkRequests, + virtual GetUpstreamCacheResult GetCacheValues(std::string_view Namespace, + std::span CacheChunkRequests, OnCacheValueGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::Horde::GetCacheValues"); @@ -399,7 +402,7 @@ namespace detail { CompressedBuffer Compressed; if (!Result.Error) { - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Request.Key.Namespace); + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); const CloudCacheResult BlobResult = Session.GetCompressedBlob(BlobStoreNamespace, Request.ChunkId); Payload = BlobResult.Response; @@ -446,7 +449,7 @@ namespace detail { CloudCacheResult Result; for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheRecord.Key.Namespace); + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheRecord.Namespace); if (m_UseLegacyDdc) { Result = Session.PutDerivedData(BlobStoreNamespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue); @@ -484,6 +487,7 @@ namespace detail { return PerformStructuredPut( Session, + CacheRecord.Namespace, CacheRecord.Key, ReferencingObject.Save().GetBuffer().AsIoBuffer(), MaxAttempts, @@ -503,6 +507,7 @@ namespace detail { { return PerformStructuredPut( Session, + CacheRecord.Namespace, CacheRecord.Key, RecordValue, MaxAttempts, @@ -548,6 +553,7 @@ namespace detail { PutUpstreamCacheResult PerformStructuredPut( CloudCacheSession& Session, + std::string_view Namespace, const CacheKey& Key, IoBuffer ObjectBuffer, const int32_t MaxAttempts, @@ -556,7 +562,7 @@ namespace detail { int64_t TotalBytes = 0ull; double TotalElapsedSeconds = 0.0; - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Key.Namespace); + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); const auto PutBlobs = [&](std::span ValueContentIds, std::string& OutReason) -> bool { for (const IoHash& ValueContentId : ValueContentIds) { @@ -738,14 +744,14 @@ namespace detail { virtual UpstreamEndpointStatus GetStatus() override { return m_Status.EndpointStatus(); } - virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override + virtual GetUpstreamCacheResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) override { ZEN_TRACE_CPU("Upstream::Zen::GetSingleCacheRecord"); try { ZenStructuredCacheSession Session(*m_Client); - const ZenCacheResult Result = Session.GetCacheRecord(CacheKey.Bucket, CacheKey.Hash, Type); + const ZenCacheResult Result = Session.GetCacheRecord(Namespace, CacheKey.Bucket, CacheKey.Hash, Type); m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -769,20 +775,24 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCacheRecords(std::span Requests, OnCacheRecordGetComplete&& OnComplete) override + virtual GetUpstreamCacheResult GetCacheRecords(std::string_view Namespace, + std::span Requests, + OnCacheRecordGetComplete&& OnComplete) override { ZEN_TRACE_CPU("Upstream::Zen::GetCacheRecords"); ZEN_ASSERT(Requests.size() > 0); CbObjectWriter BatchRequest; BatchRequest << "Method"sv - << "GetCacheRecords"; + << "GetCacheRecords"sv; BatchRequest.BeginObject("Params"sv); { CachePolicy DefaultPolicy = Requests[0]->Policy.GetRecordPolicy(); BatchRequest << "DefaultPolicy"sv << WriteToString<128>(DefaultPolicy); + BatchRequest << "Namespace"sv << Namespace; + BatchRequest.BeginArray("Requests"sv); for (CacheKeyRequest* Request : Requests) { @@ -791,7 +801,6 @@ namespace detail { const CacheKey& Key = Request->Key; BatchRequest.BeginObject("Key"sv); { - BatchRequest << "Namespace"sv << Key.Namespace; BatchRequest << "Bucket"sv << Key.Bucket; BatchRequest << "Hash"sv << Key.Hash; } @@ -848,14 +857,16 @@ namespace detail { return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; } - virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) override + virtual GetUpstreamCacheResult GetCacheValue(std::string_view Namespace, + const CacheKey& CacheKey, + const IoHash& ValueContentId) override { ZEN_TRACE_CPU("Upstream::Zen::GetSingleCacheValue"); try { ZenStructuredCacheSession Session(*m_Client); - const ZenCacheResult Result = Session.GetCacheValue(CacheKey.Bucket, CacheKey.Hash, ValueContentId); + const ZenCacheResult Result = Session.GetCacheValue(Namespace, CacheKey.Bucket, CacheKey.Hash, ValueContentId); m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -879,7 +890,8 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCacheValues(std::span CacheChunkRequests, + virtual GetUpstreamCacheResult GetCacheValues(std::string_view Namespace, + std::span CacheChunkRequests, OnCacheValueGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::Zen::GetCacheValues"); @@ -887,12 +899,16 @@ namespace detail { CbObjectWriter BatchRequest; BatchRequest << "Method"sv - << "GetCacheChunks"; + << "GetCacheChunks"sv; + BatchRequest << "Namespace"sv << Namespace; BatchRequest.BeginObject("Params"sv); { CachePolicy DefaultPolicy = CacheChunkRequests[0]->Policy; BatchRequest << "DefaultPolicy"sv << WriteToString<128>(DefaultPolicy).ToView(); + + BatchRequest << "Namespace"sv << Namespace; + BatchRequest.BeginArray("ChunkRequests"sv); { for (CacheChunkRequest* RequestPtr : CacheChunkRequests) @@ -902,7 +918,6 @@ namespace detail { BatchRequest.BeginObject(); { BatchRequest.BeginObject("Key"sv); - BatchRequest << "Namespace"sv << Request.Key.Namespace; BatchRequest << "Bucket"sv << Request.Key.Bucket; BatchRequest << "Hash"sv << Request.Key.Hash; BatchRequest.EndObject(); @@ -1042,7 +1057,11 @@ namespace detail { for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { - Result = Session.PutCacheRecord(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, PackagePayload, CacheRecord.Type); + Result = Session.PutCacheRecord(CacheRecord.Namespace, + CacheRecord.Key.Bucket, + CacheRecord.Key.Hash, + PackagePayload, + CacheRecord.Type); } m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -1061,12 +1080,14 @@ namespace detail { CbPackage BatchPackage; CbObjectWriter BatchWriter; BatchWriter << "Method"sv - << "PutCacheValues"; + << "PutCacheValues"sv; BatchWriter.BeginObject("Params"sv); { // DefaultPolicy unspecified and expected to be Default + BatchWriter << "Namespace"sv << CacheRecord.Namespace; + BatchWriter.BeginArray("Requests"sv); { BatchWriter.BeginObject(); @@ -1074,7 +1095,6 @@ namespace detail { const CacheKey& Key = CacheRecord.Key; BatchWriter.BeginObject("Key"sv); { - BatchWriter << "Namespace"sv << Key.Namespace; BatchWriter << "Bucket"sv << Key.Bucket; BatchWriter << "Hash"sv << Key.Hash; } @@ -1108,7 +1128,8 @@ namespace detail { Result.Success = false; for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { - Result = Session.PutCacheValue(CacheRecord.Key.Bucket, + Result = Session.PutCacheValue(CacheRecord.Namespace, + CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheRecord.ValueContentIds[Idx], Values[Idx]); @@ -1131,7 +1152,11 @@ namespace detail { Result.Success = false; for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { - Result = Session.PutCacheRecord(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue, CacheRecord.Type); + Result = Session.PutCacheRecord(CacheRecord.Namespace, + CacheRecord.Key.Bucket, + CacheRecord.Key.Hash, + RecordValue, + CacheRecord.Type); } m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -1259,7 +1284,7 @@ public: } } - virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override + virtual GetUpstreamCacheResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) override { ZEN_TRACE_CPU("Upstream::GetCacheRecord"); @@ -1278,7 +1303,7 @@ public: GetUpstreamCacheResult Result; { metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); - Result = Endpoint->GetCacheRecord(CacheKey, Type); + Result = Endpoint->GetCacheRecord(Namespace, CacheKey, Type); } Stats.CacheGetCount.Increment(1); @@ -1306,7 +1331,9 @@ public: return {}; } - virtual void GetCacheRecords(std::span Requests, OnCacheRecordGetComplete&& OnComplete) override final + virtual void GetCacheRecords(std::string_view Namespace, + std::span Requests, + OnCacheRecordGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::GetCacheRecords"); @@ -1334,7 +1361,7 @@ public: { metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); - Result = Endpoint->GetCacheRecords(RemainingKeys, [&](CacheRecordGetCompleteParams&& Params) { + Result = Endpoint->GetCacheRecords(Namespace, RemainingKeys, [&](CacheRecordGetCompleteParams&& Params) { if (Params.Record) { OnComplete(std::forward(Params)); @@ -1371,7 +1398,9 @@ public: } } - virtual void GetCacheValues(std::span CacheChunkRequests, OnCacheValueGetComplete&& OnComplete) override final + virtual void GetCacheValues(std::string_view Namespace, + std::span CacheChunkRequests, + OnCacheValueGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::GetCacheValues"); @@ -1399,7 +1428,7 @@ public: { metrics::OperationTiming::Scope Scope(Endpoint->Stats().CacheGetRequestTiming); - Result = Endpoint->GetCacheValues(RemainingKeys, [&](CacheValueGetCompleteParams&& Params) { + Result = Endpoint->GetCacheValues(Namespace, RemainingKeys, [&](CacheValueGetCompleteParams&& Params) { if (Params.RawHash != Params.RawHash.Zero) { OnComplete(std::forward(Params)); @@ -1436,7 +1465,9 @@ public: } } - virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) override + virtual GetUpstreamCacheResult GetCacheValue(std::string_view Namespace, + const CacheKey& CacheKey, + const IoHash& ValueContentId) override { ZEN_TRACE_CPU("Upstream::GetCacheValue"); @@ -1454,7 +1485,7 @@ public: { metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); - Result = Endpoint->GetCacheValue(CacheKey, ValueContentId); + Result = Endpoint->GetCacheValue(Namespace, CacheKey, ValueContentId); } Stats.CacheGetCount.Increment(1); @@ -1550,7 +1581,7 @@ private: ZenCacheValue CacheValue; std::vector Payloads; - if (!m_CacheStore.Get(CacheRecord.Key.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheValue)) + if (!m_CacheStore.Get(CacheRecord.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheValue)) { ZEN_WARN("process upstream FAILED, '{}/{}', cache record doesn't exist", CacheRecord.Key.Bucket, CacheRecord.Key.Hash); return; @@ -1565,7 +1596,7 @@ private: else { ZEN_WARN("process upstream FAILED, '{}/{}/{}/{}', ValueContentId doesn't exist in CAS", - CacheRecord.Key.Namespace, + CacheRecord.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, ValueContentId); -- cgit v1.2.3