aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/upstream/upstreamcache.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver/upstream/upstreamcache.cpp')
-rw-r--r--src/zenserver/upstream/upstreamcache.cpp114
1 files changed, 51 insertions, 63 deletions
diff --git a/src/zenserver/upstream/upstreamcache.cpp b/src/zenserver/upstream/upstreamcache.cpp
index 3e1d1fbd6..e438a840a 100644
--- a/src/zenserver/upstream/upstreamcache.cpp
+++ b/src/zenserver/upstream/upstreamcache.cpp
@@ -14,13 +14,14 @@
#include <zencore/timer.h>
#include <zencore/trace.h>
-#include <zenhttp/auth/authmgr.h>
+#include <zenhttp/httpclientauth.h>
#include <zenhttp/packageformat.h>
#include <zenstore/cache/structuredcachestore.h>
#include <zenstore/cidstore.h>
-#include <zenutil/jupiter.h>
+#include <zenutil/jupiter/jupiterclient.h>
+#include <zenutil/jupiter/jupitersession.h>
#include "cache/httpstructuredcache.h"
#include "diag/logging.h"
@@ -87,7 +88,7 @@ namespace detail {
class JupiterUpstreamEndpoint final : public UpstreamEndpoint
{
public:
- JupiterUpstreamEndpoint(const CloudCacheClientOptions& Options, const UpstreamAuthConfig& AuthConfig, AuthMgr& Mgr)
+ JupiterUpstreamEndpoint(const JupiterClientOptions& Options, const UpstreamAuthConfig& AuthConfig, AuthMgr& Mgr)
: m_AuthMgr(Mgr)
, m_Log(zen::logging::Get("upstream"))
{
@@ -95,30 +96,27 @@ namespace detail {
m_Info.Name = Options.Name;
m_Info.Url = Options.ServiceUrl;
- std::unique_ptr<CloudCacheTokenProvider> TokenProvider;
+ std::function<HttpClientAccessToken()> TokenProvider;
if (AuthConfig.OAuthUrl.empty() == false)
{
- TokenProvider = CloudCacheTokenProvider::CreateFromOAuthClientCredentials(
+ TokenProvider = httpclientauth::CreateFromOAuthClientCredentials(
{.Url = AuthConfig.OAuthUrl, .ClientId = AuthConfig.OAuthClientId, .ClientSecret = AuthConfig.OAuthClientSecret});
}
- else if (AuthConfig.OpenIdProvider.empty() == false)
+ else if (!AuthConfig.OpenIdProvider.empty())
{
- TokenProvider =
- CloudCacheTokenProvider::CreateFromCallback([this, ProviderName = std::string(AuthConfig.OpenIdProvider)]() {
- AuthMgr::OpenIdAccessToken Token = m_AuthMgr.GetOpenIdAccessToken(ProviderName);
- return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime};
- });
+ TokenProvider = httpclientauth::CreateFromOpenIdProvider(m_AuthMgr, AuthConfig.OpenIdProvider);
+ }
+ else if (!AuthConfig.AccessToken.empty())
+ {
+ TokenProvider = httpclientauth::CreateFromStaticToken(AuthConfig.AccessToken);
}
else
{
- CloudCacheAccessToken AccessToken{.Value = std::string(AuthConfig.AccessToken),
- .ExpireTime = CloudCacheAccessToken::TimePoint::max()};
-
- TokenProvider = CloudCacheTokenProvider::CreateFromStaticToken(AccessToken);
+ TokenProvider = httpclientauth::CreateFromDefaultOpenIdProvider(m_AuthMgr);
}
- m_Client = new CloudCacheClient(Options, std::move(TokenProvider));
+ m_Client = new JupiterClient(Options, std::move(TokenProvider));
}
virtual ~JupiterUpstreamEndpoint() {}
@@ -136,8 +134,8 @@ namespace detail {
return {.State = UpstreamEndpointState::kOk};
}
- CloudCacheSession Session(m_Client);
- const CloudCacheResult Result = Session.Authenticate();
+ JupiterSession Session(m_Client->Logger(), m_Client->Client());
+ const JupiterResult Result = Session.Authenticate();
if (Result.Success)
{
@@ -162,20 +160,11 @@ namespace detail {
}
}
- std::string_view GetActualDdcNamespace(CloudCacheSession& Session, std::string_view Namespace)
- {
- if (Namespace == ZenCacheStore::DefaultNamespace)
- {
- return Session.Client().DefaultDdcNamespace();
- }
- return Namespace;
- }
-
- std::string_view GetActualBlobStoreNamespace(CloudCacheSession& Session, std::string_view Namespace)
+ std::string_view GetActualBlobStoreNamespace(std::string_view Namespace)
{
if (Namespace == ZenCacheStore::DefaultNamespace)
{
- return Session.Client().DefaultBlobStoreNamespace();
+ return m_Client->DefaultBlobStoreNamespace();
}
return Namespace;
}
@@ -192,10 +181,10 @@ namespace detail {
try
{
- CloudCacheSession Session(m_Client);
- CloudCacheResult Result;
+ JupiterSession Session(m_Client->Logger(), m_Client->Client());
+ JupiterResult Result;
- std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace);
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace);
if (Type == ZenContentType::kCompressedBinary)
{
@@ -211,7 +200,7 @@ namespace detail {
int NumAttachments = 0;
CacheRecord.IterateAttachments([&](CbFieldView AttachmentHash) {
- CloudCacheResult AttachmentResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash());
+ JupiterResult AttachmentResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash());
Result.ReceivedBytes += AttachmentResult.ReceivedBytes;
Result.SentBytes += AttachmentResult.SentBytes;
Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds;
@@ -251,7 +240,7 @@ namespace detail {
CbObject CacheRecord = LoadCompactBinaryObject(Result.Response);
CacheRecord.IterateAttachments([&](CbFieldView AttachmentHash) {
- CloudCacheResult AttachmentResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash());
+ JupiterResult AttachmentResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash());
Result.ReceivedBytes += AttachmentResult.ReceivedBytes;
Result.SentBytes += AttachmentResult.SentBytes;
Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds;
@@ -312,7 +301,7 @@ namespace detail {
{
ZEN_TRACE_CPU("Upstream::Jupiter::GetCacheRecords");
- CloudCacheSession Session(m_Client);
+ JupiterSession Session(m_Client->Logger(), m_Client->Client());
GetUpstreamCacheResult Result;
for (CacheKeyRequest* Request : Requests)
@@ -324,9 +313,8 @@ namespace detail {
double ElapsedSeconds = 0.0;
if (!Result.Error)
{
- std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace);
- CloudCacheResult RefResult =
- Session.GetRef(BlobStoreNamespace, CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject);
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace);
+ JupiterResult RefResult = Session.GetRef(BlobStoreNamespace, CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject);
AppendResult(RefResult, Result);
ElapsedSeconds = RefResult.ElapsedSeconds;
@@ -339,7 +327,7 @@ namespace detail {
{
Record = LoadCompactBinaryObject(RefResult.Response);
Record.IterateAttachments([&](CbFieldView AttachmentHash) {
- CloudCacheResult BlobResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash());
+ JupiterResult BlobResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash());
AppendResult(BlobResult, Result);
m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason);
@@ -377,9 +365,9 @@ namespace detail {
try
{
- CloudCacheSession Session(m_Client);
- std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace);
- const CloudCacheResult Result = Session.GetCompressedBlob(BlobStoreNamespace, ValueContentId);
+ JupiterSession Session(m_Client->Logger(), m_Client->Client());
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace);
+ const JupiterResult Result = Session.GetCompressedBlob(BlobStoreNamespace, ValueContentId);
m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason);
@@ -410,7 +398,7 @@ namespace detail {
{
ZEN_TRACE_CPU("Upstream::Jupiter::GetCacheChunks");
- CloudCacheSession Session(m_Client);
+ JupiterSession Session(m_Client->Logger(), m_Client->Client());
GetUpstreamCacheResult Result;
for (CacheChunkRequest* RequestPtr : CacheChunkRequests)
@@ -424,8 +412,8 @@ namespace detail {
bool IsCompressed = false;
if (!Result.Error)
{
- std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace);
- const CloudCacheResult BlobResult =
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace);
+ const JupiterResult BlobResult =
Request.ChunkId == IoHash::Zero
? Session.GetInlineBlob(BlobStoreNamespace, Request.Key.Bucket, Request.Key.Hash, Request.ChunkId)
: Session.GetCompressedBlob(BlobStoreNamespace, Request.ChunkId);
@@ -465,7 +453,7 @@ namespace detail {
{
ZEN_TRACE_CPU("Upstream::Jupiter::GetCacheValues");
- CloudCacheSession Session(m_Client);
+ JupiterSession Session(m_Client->Logger(), m_Client->Client());
GetUpstreamCacheResult Result;
for (CacheValueRequest* RequestPtr : CacheValueRequests)
@@ -479,9 +467,9 @@ namespace detail {
bool IsCompressed = false;
if (!Result.Error)
{
- std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace);
- IoHash PayloadHash;
- const CloudCacheResult BlobResult =
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace);
+ IoHash PayloadHash;
+ const JupiterResult BlobResult =
Session.GetInlineBlob(BlobStoreNamespace, Request.Key.Bucket, Request.Key.Hash, PayloadHash);
ElapsedSeconds = BlobResult.ElapsedSeconds;
Payload = BlobResult.Response;
@@ -545,14 +533,14 @@ namespace detail {
try
{
- CloudCacheSession Session(m_Client);
+ JupiterSession Session(m_Client->Logger(), m_Client->Client());
if (CacheRecord.Type == ZenContentType::kBinary)
{
- CloudCacheResult Result;
+ JupiterResult Result;
for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
{
- std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheRecord.Namespace);
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(CacheRecord.Namespace);
Result = Session.PutRef(BlobStoreNamespace,
CacheRecord.Key.Bucket,
CacheRecord.Key.Hash,
@@ -634,7 +622,7 @@ namespace detail {
virtual UpstreamEndpointStats& Stats() override { return m_Stats; }
private:
- static void AppendResult(const CloudCacheResult& Result, GetUpstreamCacheResult& Out)
+ static void AppendResult(const JupiterResult& Result, GetUpstreamCacheResult& Out)
{
Out.Success &= Result.Success;
Out.Bytes += gsl::narrow<int64_t>(Result.ReceivedBytes);
@@ -647,7 +635,7 @@ namespace detail {
};
PutUpstreamCacheResult PerformStructuredPut(
- CloudCacheSession& Session,
+ JupiterSession& Session,
std::string_view Namespace,
const CacheKey& Key,
IoBuffer ObjectBuffer,
@@ -657,7 +645,7 @@ namespace detail {
int64_t TotalBytes = 0ull;
double TotalElapsedSeconds = 0.0;
- std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace);
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace);
const auto PutBlobs = [&](std::span<IoHash> ValueContentIds, std::string& OutReason) -> bool {
for (const IoHash& ValueContentId : ValueContentIds)
{
@@ -667,7 +655,7 @@ namespace detail {
return false;
}
- CloudCacheResult BlobResult;
+ JupiterResult BlobResult;
for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++)
{
BlobResult = Session.PutCompressedBlob(BlobStoreNamespace, ValueContentId, BlobBuffer);
@@ -762,12 +750,12 @@ namespace detail {
LoggerRef Log() { return m_Log; }
- AuthMgr& m_AuthMgr;
- LoggerRef m_Log;
- UpstreamEndpointInfo m_Info;
- UpstreamStatus m_Status;
- UpstreamEndpointStats m_Stats;
- RefPtr<CloudCacheClient> m_Client;
+ AuthMgr& m_AuthMgr;
+ LoggerRef m_Log;
+ UpstreamEndpointInfo m_Info;
+ UpstreamStatus m_Status;
+ UpstreamEndpointStats m_Stats;
+ RefPtr<JupiterClient> m_Client;
};
class ZenUpstreamEndpoint final : public UpstreamEndpoint
@@ -2131,7 +2119,7 @@ UpstreamEndpoint::CreateZenEndpoint(const ZenStructuredCacheClientOptions& Optio
}
std::unique_ptr<UpstreamEndpoint>
-UpstreamEndpoint::CreateJupiterEndpoint(const CloudCacheClientOptions& Options, const UpstreamAuthConfig& AuthConfig, AuthMgr& Mgr)
+UpstreamEndpoint::CreateJupiterEndpoint(const JupiterClientOptions& Options, const UpstreamAuthConfig& AuthConfig, AuthMgr& Mgr)
{
return std::make_unique<detail::JupiterUpstreamEndpoint>(Options, AuthConfig, Mgr);
}