diff options
| author | Dan Engelbrecht <[email protected]> | 2025-01-23 12:49:46 +0100 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2025-01-23 12:49:46 +0100 |
| commit | 5d47e5946da5ccdba5bd2fb770b7bdfabb48fb4c (patch) | |
| tree | cb64f2d93a9f5e8b0e1529ab0dc3371602688a4f /src/zenserver/upstream/upstreamcache.cpp | |
| parent | changelog (diff) | |
| parent | handle special backslash followed by quote for paths (#279) (diff) | |
| download | zen-5d47e5946da5ccdba5bd2fb770b7bdfabb48fb4c.tar.xz zen-5d47e5946da5ccdba5bd2fb770b7bdfabb48fb4c.zip | |
Merge remote-tracking branch 'origin/main' into de/zen-service-command
Diffstat (limited to 'src/zenserver/upstream/upstreamcache.cpp')
| -rw-r--r-- | src/zenserver/upstream/upstreamcache.cpp | 114 |
1 files changed, 51 insertions, 63 deletions
diff --git a/src/zenserver/upstream/upstreamcache.cpp b/src/zenserver/upstream/upstreamcache.cpp index 3e1d1fbd6..e438a840a 100644 --- a/src/zenserver/upstream/upstreamcache.cpp +++ b/src/zenserver/upstream/upstreamcache.cpp @@ -14,13 +14,14 @@ #include <zencore/timer.h> #include <zencore/trace.h> -#include <zenhttp/auth/authmgr.h> +#include <zenhttp/httpclientauth.h> #include <zenhttp/packageformat.h> #include <zenstore/cache/structuredcachestore.h> #include <zenstore/cidstore.h> -#include <zenutil/jupiter.h> +#include <zenutil/jupiter/jupiterclient.h> +#include <zenutil/jupiter/jupitersession.h> #include "cache/httpstructuredcache.h" #include "diag/logging.h" @@ -87,7 +88,7 @@ namespace detail { class JupiterUpstreamEndpoint final : public UpstreamEndpoint { public: - JupiterUpstreamEndpoint(const CloudCacheClientOptions& Options, const UpstreamAuthConfig& AuthConfig, AuthMgr& Mgr) + JupiterUpstreamEndpoint(const JupiterClientOptions& Options, const UpstreamAuthConfig& AuthConfig, AuthMgr& Mgr) : m_AuthMgr(Mgr) , m_Log(zen::logging::Get("upstream")) { @@ -95,30 +96,27 @@ namespace detail { m_Info.Name = Options.Name; m_Info.Url = Options.ServiceUrl; - std::unique_ptr<CloudCacheTokenProvider> TokenProvider; + std::function<HttpClientAccessToken()> TokenProvider; if (AuthConfig.OAuthUrl.empty() == false) { - TokenProvider = CloudCacheTokenProvider::CreateFromOAuthClientCredentials( + TokenProvider = httpclientauth::CreateFromOAuthClientCredentials( {.Url = AuthConfig.OAuthUrl, .ClientId = AuthConfig.OAuthClientId, .ClientSecret = AuthConfig.OAuthClientSecret}); } - else if (AuthConfig.OpenIdProvider.empty() == false) + else if (!AuthConfig.OpenIdProvider.empty()) { - TokenProvider = - CloudCacheTokenProvider::CreateFromCallback([this, ProviderName = std::string(AuthConfig.OpenIdProvider)]() { - AuthMgr::OpenIdAccessToken Token = m_AuthMgr.GetOpenIdAccessToken(ProviderName); - return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime}; - }); + TokenProvider = httpclientauth::CreateFromOpenIdProvider(m_AuthMgr, AuthConfig.OpenIdProvider); + } + else if (!AuthConfig.AccessToken.empty()) + { + TokenProvider = httpclientauth::CreateFromStaticToken(AuthConfig.AccessToken); } else { - CloudCacheAccessToken AccessToken{.Value = std::string(AuthConfig.AccessToken), - .ExpireTime = CloudCacheAccessToken::TimePoint::max()}; - - TokenProvider = CloudCacheTokenProvider::CreateFromStaticToken(AccessToken); + TokenProvider = httpclientauth::CreateFromDefaultOpenIdProvider(m_AuthMgr); } - m_Client = new CloudCacheClient(Options, std::move(TokenProvider)); + m_Client = new JupiterClient(Options, std::move(TokenProvider)); } virtual ~JupiterUpstreamEndpoint() {} @@ -136,8 +134,8 @@ namespace detail { return {.State = UpstreamEndpointState::kOk}; } - CloudCacheSession Session(m_Client); - const CloudCacheResult Result = Session.Authenticate(); + JupiterSession Session(m_Client->Logger(), m_Client->Client()); + const JupiterResult Result = Session.Authenticate(); if (Result.Success) { @@ -162,20 +160,11 @@ namespace detail { } } - std::string_view GetActualDdcNamespace(CloudCacheSession& Session, std::string_view Namespace) - { - if (Namespace == ZenCacheStore::DefaultNamespace) - { - return Session.Client().DefaultDdcNamespace(); - } - return Namespace; - } - - std::string_view GetActualBlobStoreNamespace(CloudCacheSession& Session, std::string_view Namespace) + std::string_view GetActualBlobStoreNamespace(std::string_view Namespace) { if (Namespace == ZenCacheStore::DefaultNamespace) { - return Session.Client().DefaultBlobStoreNamespace(); + return m_Client->DefaultBlobStoreNamespace(); } return Namespace; } @@ -192,10 +181,10 @@ namespace detail { try { - CloudCacheSession Session(m_Client); - CloudCacheResult Result; + JupiterSession Session(m_Client->Logger(), m_Client->Client()); + JupiterResult Result; - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace); if (Type == ZenContentType::kCompressedBinary) { @@ -211,7 +200,7 @@ namespace detail { int NumAttachments = 0; CacheRecord.IterateAttachments([&](CbFieldView AttachmentHash) { - CloudCacheResult AttachmentResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash()); + JupiterResult AttachmentResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash()); Result.ReceivedBytes += AttachmentResult.ReceivedBytes; Result.SentBytes += AttachmentResult.SentBytes; Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds; @@ -251,7 +240,7 @@ namespace detail { CbObject CacheRecord = LoadCompactBinaryObject(Result.Response); CacheRecord.IterateAttachments([&](CbFieldView AttachmentHash) { - CloudCacheResult AttachmentResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash()); + JupiterResult AttachmentResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash()); Result.ReceivedBytes += AttachmentResult.ReceivedBytes; Result.SentBytes += AttachmentResult.SentBytes; Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds; @@ -312,7 +301,7 @@ namespace detail { { ZEN_TRACE_CPU("Upstream::Jupiter::GetCacheRecords"); - CloudCacheSession Session(m_Client); + JupiterSession Session(m_Client->Logger(), m_Client->Client()); GetUpstreamCacheResult Result; for (CacheKeyRequest* Request : Requests) @@ -324,9 +313,8 @@ namespace detail { double ElapsedSeconds = 0.0; if (!Result.Error) { - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); - CloudCacheResult RefResult = - Session.GetRef(BlobStoreNamespace, CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject); + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace); + JupiterResult RefResult = Session.GetRef(BlobStoreNamespace, CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject); AppendResult(RefResult, Result); ElapsedSeconds = RefResult.ElapsedSeconds; @@ -339,7 +327,7 @@ namespace detail { { Record = LoadCompactBinaryObject(RefResult.Response); Record.IterateAttachments([&](CbFieldView AttachmentHash) { - CloudCacheResult BlobResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash()); + JupiterResult BlobResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash()); AppendResult(BlobResult, Result); m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); @@ -377,9 +365,9 @@ namespace detail { try { - CloudCacheSession Session(m_Client); - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); - const CloudCacheResult Result = Session.GetCompressedBlob(BlobStoreNamespace, ValueContentId); + JupiterSession Session(m_Client->Logger(), m_Client->Client()); + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace); + const JupiterResult Result = Session.GetCompressedBlob(BlobStoreNamespace, ValueContentId); m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -410,7 +398,7 @@ namespace detail { { ZEN_TRACE_CPU("Upstream::Jupiter::GetCacheChunks"); - CloudCacheSession Session(m_Client); + JupiterSession Session(m_Client->Logger(), m_Client->Client()); GetUpstreamCacheResult Result; for (CacheChunkRequest* RequestPtr : CacheChunkRequests) @@ -424,8 +412,8 @@ namespace detail { bool IsCompressed = false; if (!Result.Error) { - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); - const CloudCacheResult BlobResult = + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace); + const JupiterResult BlobResult = Request.ChunkId == IoHash::Zero ? Session.GetInlineBlob(BlobStoreNamespace, Request.Key.Bucket, Request.Key.Hash, Request.ChunkId) : Session.GetCompressedBlob(BlobStoreNamespace, Request.ChunkId); @@ -465,7 +453,7 @@ namespace detail { { ZEN_TRACE_CPU("Upstream::Jupiter::GetCacheValues"); - CloudCacheSession Session(m_Client); + JupiterSession Session(m_Client->Logger(), m_Client->Client()); GetUpstreamCacheResult Result; for (CacheValueRequest* RequestPtr : CacheValueRequests) @@ -479,9 +467,9 @@ namespace detail { bool IsCompressed = false; if (!Result.Error) { - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); - IoHash PayloadHash; - const CloudCacheResult BlobResult = + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace); + IoHash PayloadHash; + const JupiterResult BlobResult = Session.GetInlineBlob(BlobStoreNamespace, Request.Key.Bucket, Request.Key.Hash, PayloadHash); ElapsedSeconds = BlobResult.ElapsedSeconds; Payload = BlobResult.Response; @@ -545,14 +533,14 @@ namespace detail { try { - CloudCacheSession Session(m_Client); + JupiterSession Session(m_Client->Logger(), m_Client->Client()); if (CacheRecord.Type == ZenContentType::kBinary) { - CloudCacheResult Result; + JupiterResult Result; for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheRecord.Namespace); + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(CacheRecord.Namespace); Result = Session.PutRef(BlobStoreNamespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, @@ -634,7 +622,7 @@ namespace detail { virtual UpstreamEndpointStats& Stats() override { return m_Stats; } private: - static void AppendResult(const CloudCacheResult& Result, GetUpstreamCacheResult& Out) + static void AppendResult(const JupiterResult& Result, GetUpstreamCacheResult& Out) { Out.Success &= Result.Success; Out.Bytes += gsl::narrow<int64_t>(Result.ReceivedBytes); @@ -647,7 +635,7 @@ namespace detail { }; PutUpstreamCacheResult PerformStructuredPut( - CloudCacheSession& Session, + JupiterSession& Session, std::string_view Namespace, const CacheKey& Key, IoBuffer ObjectBuffer, @@ -657,7 +645,7 @@ namespace detail { int64_t TotalBytes = 0ull; double TotalElapsedSeconds = 0.0; - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace); const auto PutBlobs = [&](std::span<IoHash> ValueContentIds, std::string& OutReason) -> bool { for (const IoHash& ValueContentId : ValueContentIds) { @@ -667,7 +655,7 @@ namespace detail { return false; } - CloudCacheResult BlobResult; + JupiterResult BlobResult; for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++) { BlobResult = Session.PutCompressedBlob(BlobStoreNamespace, ValueContentId, BlobBuffer); @@ -762,12 +750,12 @@ namespace detail { LoggerRef Log() { return m_Log; } - AuthMgr& m_AuthMgr; - LoggerRef m_Log; - UpstreamEndpointInfo m_Info; - UpstreamStatus m_Status; - UpstreamEndpointStats m_Stats; - RefPtr<CloudCacheClient> m_Client; + AuthMgr& m_AuthMgr; + LoggerRef m_Log; + UpstreamEndpointInfo m_Info; + UpstreamStatus m_Status; + UpstreamEndpointStats m_Stats; + RefPtr<JupiterClient> m_Client; }; class ZenUpstreamEndpoint final : public UpstreamEndpoint @@ -2131,7 +2119,7 @@ UpstreamEndpoint::CreateZenEndpoint(const ZenStructuredCacheClientOptions& Optio } std::unique_ptr<UpstreamEndpoint> -UpstreamEndpoint::CreateJupiterEndpoint(const CloudCacheClientOptions& Options, const UpstreamAuthConfig& AuthConfig, AuthMgr& Mgr) +UpstreamEndpoint::CreateJupiterEndpoint(const JupiterClientOptions& Options, const UpstreamAuthConfig& AuthConfig, AuthMgr& Mgr) { return std::make_unique<detail::JupiterUpstreamEndpoint>(Options, AuthConfig, Mgr); } |