aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/upstream/upstreamcache.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver/upstream/upstreamcache.cpp')
-rw-r--r--src/zenserver/upstream/upstreamcache.cpp120
1 files changed, 55 insertions, 65 deletions
diff --git a/src/zenserver/upstream/upstreamcache.cpp b/src/zenserver/upstream/upstreamcache.cpp
index ab8fe8704..e438a840a 100644
--- a/src/zenserver/upstream/upstreamcache.cpp
+++ b/src/zenserver/upstream/upstreamcache.cpp
@@ -1,7 +1,6 @@
// Copyright Epic Games, Inc. All Rights Reserved.
#include "upstreamcache.h"
-#include "jupiter.h"
#include "zen.h"
#include <zencore/blockingqueue.h>
@@ -15,11 +14,15 @@
#include <zencore/timer.h>
#include <zencore/trace.h>
-#include <zenhttp/auth/authmgr.h>
-#include <zenstore/cidstore.h>
-#include <zenutil/packageformat.h>
+#include <zenhttp/httpclientauth.h>
+#include <zenhttp/packageformat.h>
#include <zenstore/cache/structuredcachestore.h>
+#include <zenstore/cidstore.h>
+
+#include <zenutil/jupiter/jupiterclient.h>
+#include <zenutil/jupiter/jupitersession.h>
+
#include "cache/httpstructuredcache.h"
#include "diag/logging.h"
@@ -85,7 +88,7 @@ namespace detail {
class JupiterUpstreamEndpoint final : public UpstreamEndpoint
{
public:
- JupiterUpstreamEndpoint(const CloudCacheClientOptions& Options, const UpstreamAuthConfig& AuthConfig, AuthMgr& Mgr)
+ JupiterUpstreamEndpoint(const JupiterClientOptions& Options, const UpstreamAuthConfig& AuthConfig, AuthMgr& Mgr)
: m_AuthMgr(Mgr)
, m_Log(zen::logging::Get("upstream"))
{
@@ -93,30 +96,27 @@ namespace detail {
m_Info.Name = Options.Name;
m_Info.Url = Options.ServiceUrl;
- std::unique_ptr<CloudCacheTokenProvider> TokenProvider;
+ std::function<HttpClientAccessToken()> TokenProvider;
if (AuthConfig.OAuthUrl.empty() == false)
{
- TokenProvider = CloudCacheTokenProvider::CreateFromOAuthClientCredentials(
+ TokenProvider = httpclientauth::CreateFromOAuthClientCredentials(
{.Url = AuthConfig.OAuthUrl, .ClientId = AuthConfig.OAuthClientId, .ClientSecret = AuthConfig.OAuthClientSecret});
}
- else if (AuthConfig.OpenIdProvider.empty() == false)
+ else if (!AuthConfig.OpenIdProvider.empty())
{
- TokenProvider =
- CloudCacheTokenProvider::CreateFromCallback([this, ProviderName = std::string(AuthConfig.OpenIdProvider)]() {
- AuthMgr::OpenIdAccessToken Token = m_AuthMgr.GetOpenIdAccessToken(ProviderName);
- return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime};
- });
+ TokenProvider = httpclientauth::CreateFromOpenIdProvider(m_AuthMgr, AuthConfig.OpenIdProvider);
+ }
+ else if (!AuthConfig.AccessToken.empty())
+ {
+ TokenProvider = httpclientauth::CreateFromStaticToken(AuthConfig.AccessToken);
}
else
{
- CloudCacheAccessToken AccessToken{.Value = std::string(AuthConfig.AccessToken),
- .ExpireTime = CloudCacheAccessToken::TimePoint::max()};
-
- TokenProvider = CloudCacheTokenProvider::CreateFromStaticToken(AccessToken);
+ TokenProvider = httpclientauth::CreateFromDefaultOpenIdProvider(m_AuthMgr);
}
- m_Client = new CloudCacheClient(Options, std::move(TokenProvider));
+ m_Client = new JupiterClient(Options, std::move(TokenProvider));
}
virtual ~JupiterUpstreamEndpoint() {}
@@ -134,8 +134,8 @@ namespace detail {
return {.State = UpstreamEndpointState::kOk};
}
- CloudCacheSession Session(m_Client);
- const CloudCacheResult Result = Session.Authenticate();
+ JupiterSession Session(m_Client->Logger(), m_Client->Client());
+ const JupiterResult Result = Session.Authenticate();
if (Result.Success)
{
@@ -160,20 +160,11 @@ namespace detail {
}
}
- std::string_view GetActualDdcNamespace(CloudCacheSession& Session, std::string_view Namespace)
- {
- if (Namespace == ZenCacheStore::DefaultNamespace)
- {
- return Session.Client().DefaultDdcNamespace();
- }
- return Namespace;
- }
-
- std::string_view GetActualBlobStoreNamespace(CloudCacheSession& Session, std::string_view Namespace)
+ std::string_view GetActualBlobStoreNamespace(std::string_view Namespace)
{
if (Namespace == ZenCacheStore::DefaultNamespace)
{
- return Session.Client().DefaultBlobStoreNamespace();
+ return m_Client->DefaultBlobStoreNamespace();
}
return Namespace;
}
@@ -190,10 +181,10 @@ namespace detail {
try
{
- CloudCacheSession Session(m_Client);
- CloudCacheResult Result;
+ JupiterSession Session(m_Client->Logger(), m_Client->Client());
+ JupiterResult Result;
- std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace);
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace);
if (Type == ZenContentType::kCompressedBinary)
{
@@ -209,7 +200,7 @@ namespace detail {
int NumAttachments = 0;
CacheRecord.IterateAttachments([&](CbFieldView AttachmentHash) {
- CloudCacheResult AttachmentResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash());
+ JupiterResult AttachmentResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash());
Result.ReceivedBytes += AttachmentResult.ReceivedBytes;
Result.SentBytes += AttachmentResult.SentBytes;
Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds;
@@ -249,7 +240,7 @@ namespace detail {
CbObject CacheRecord = LoadCompactBinaryObject(Result.Response);
CacheRecord.IterateAttachments([&](CbFieldView AttachmentHash) {
- CloudCacheResult AttachmentResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash());
+ JupiterResult AttachmentResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash());
Result.ReceivedBytes += AttachmentResult.ReceivedBytes;
Result.SentBytes += AttachmentResult.SentBytes;
Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds;
@@ -310,7 +301,7 @@ namespace detail {
{
ZEN_TRACE_CPU("Upstream::Jupiter::GetCacheRecords");
- CloudCacheSession Session(m_Client);
+ JupiterSession Session(m_Client->Logger(), m_Client->Client());
GetUpstreamCacheResult Result;
for (CacheKeyRequest* Request : Requests)
@@ -322,9 +313,8 @@ namespace detail {
double ElapsedSeconds = 0.0;
if (!Result.Error)
{
- std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace);
- CloudCacheResult RefResult =
- Session.GetRef(BlobStoreNamespace, CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject);
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace);
+ JupiterResult RefResult = Session.GetRef(BlobStoreNamespace, CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject);
AppendResult(RefResult, Result);
ElapsedSeconds = RefResult.ElapsedSeconds;
@@ -337,7 +327,7 @@ namespace detail {
{
Record = LoadCompactBinaryObject(RefResult.Response);
Record.IterateAttachments([&](CbFieldView AttachmentHash) {
- CloudCacheResult BlobResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash());
+ JupiterResult BlobResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash());
AppendResult(BlobResult, Result);
m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason);
@@ -375,9 +365,9 @@ namespace detail {
try
{
- CloudCacheSession Session(m_Client);
- std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace);
- const CloudCacheResult Result = Session.GetCompressedBlob(BlobStoreNamespace, ValueContentId);
+ JupiterSession Session(m_Client->Logger(), m_Client->Client());
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace);
+ const JupiterResult Result = Session.GetCompressedBlob(BlobStoreNamespace, ValueContentId);
m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason);
@@ -408,7 +398,7 @@ namespace detail {
{
ZEN_TRACE_CPU("Upstream::Jupiter::GetCacheChunks");
- CloudCacheSession Session(m_Client);
+ JupiterSession Session(m_Client->Logger(), m_Client->Client());
GetUpstreamCacheResult Result;
for (CacheChunkRequest* RequestPtr : CacheChunkRequests)
@@ -422,8 +412,8 @@ namespace detail {
bool IsCompressed = false;
if (!Result.Error)
{
- std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace);
- const CloudCacheResult BlobResult =
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace);
+ const JupiterResult BlobResult =
Request.ChunkId == IoHash::Zero
? Session.GetInlineBlob(BlobStoreNamespace, Request.Key.Bucket, Request.Key.Hash, Request.ChunkId)
: Session.GetCompressedBlob(BlobStoreNamespace, Request.ChunkId);
@@ -463,7 +453,7 @@ namespace detail {
{
ZEN_TRACE_CPU("Upstream::Jupiter::GetCacheValues");
- CloudCacheSession Session(m_Client);
+ JupiterSession Session(m_Client->Logger(), m_Client->Client());
GetUpstreamCacheResult Result;
for (CacheValueRequest* RequestPtr : CacheValueRequests)
@@ -477,9 +467,9 @@ namespace detail {
bool IsCompressed = false;
if (!Result.Error)
{
- std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace);
- IoHash PayloadHash;
- const CloudCacheResult BlobResult =
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace);
+ IoHash PayloadHash;
+ const JupiterResult BlobResult =
Session.GetInlineBlob(BlobStoreNamespace, Request.Key.Bucket, Request.Key.Hash, PayloadHash);
ElapsedSeconds = BlobResult.ElapsedSeconds;
Payload = BlobResult.Response;
@@ -543,14 +533,14 @@ namespace detail {
try
{
- CloudCacheSession Session(m_Client);
+ JupiterSession Session(m_Client->Logger(), m_Client->Client());
if (CacheRecord.Type == ZenContentType::kBinary)
{
- CloudCacheResult Result;
+ JupiterResult Result;
for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
{
- std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheRecord.Namespace);
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(CacheRecord.Namespace);
Result = Session.PutRef(BlobStoreNamespace,
CacheRecord.Key.Bucket,
CacheRecord.Key.Hash,
@@ -632,7 +622,7 @@ namespace detail {
virtual UpstreamEndpointStats& Stats() override { return m_Stats; }
private:
- static void AppendResult(const CloudCacheResult& Result, GetUpstreamCacheResult& Out)
+ static void AppendResult(const JupiterResult& Result, GetUpstreamCacheResult& Out)
{
Out.Success &= Result.Success;
Out.Bytes += gsl::narrow<int64_t>(Result.ReceivedBytes);
@@ -645,7 +635,7 @@ namespace detail {
};
PutUpstreamCacheResult PerformStructuredPut(
- CloudCacheSession& Session,
+ JupiterSession& Session,
std::string_view Namespace,
const CacheKey& Key,
IoBuffer ObjectBuffer,
@@ -655,7 +645,7 @@ namespace detail {
int64_t TotalBytes = 0ull;
double TotalElapsedSeconds = 0.0;
- std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace);
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace);
const auto PutBlobs = [&](std::span<IoHash> ValueContentIds, std::string& OutReason) -> bool {
for (const IoHash& ValueContentId : ValueContentIds)
{
@@ -665,7 +655,7 @@ namespace detail {
return false;
}
- CloudCacheResult BlobResult;
+ JupiterResult BlobResult;
for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++)
{
BlobResult = Session.PutCompressedBlob(BlobStoreNamespace, ValueContentId, BlobBuffer);
@@ -760,12 +750,12 @@ namespace detail {
LoggerRef Log() { return m_Log; }
- AuthMgr& m_AuthMgr;
- LoggerRef m_Log;
- UpstreamEndpointInfo m_Info;
- UpstreamStatus m_Status;
- UpstreamEndpointStats m_Stats;
- RefPtr<CloudCacheClient> m_Client;
+ AuthMgr& m_AuthMgr;
+ LoggerRef m_Log;
+ UpstreamEndpointInfo m_Info;
+ UpstreamStatus m_Status;
+ UpstreamEndpointStats m_Stats;
+ RefPtr<JupiterClient> m_Client;
};
class ZenUpstreamEndpoint final : public UpstreamEndpoint
@@ -2129,7 +2119,7 @@ UpstreamEndpoint::CreateZenEndpoint(const ZenStructuredCacheClientOptions& Optio
}
std::unique_ptr<UpstreamEndpoint>
-UpstreamEndpoint::CreateJupiterEndpoint(const CloudCacheClientOptions& Options, const UpstreamAuthConfig& AuthConfig, AuthMgr& Mgr)
+UpstreamEndpoint::CreateJupiterEndpoint(const JupiterClientOptions& Options, const UpstreamAuthConfig& AuthConfig, AuthMgr& Mgr)
{
return std::make_unique<detail::JupiterUpstreamEndpoint>(Options, AuthConfig, Mgr);
}