aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPer Larsson <[email protected]>2022-01-24 11:11:10 +0100
committerPer Larsson <[email protected]>2022-01-24 11:11:10 +0100
commitdc6becffb513280170958f94e18c1b2966ade4d1 (patch)
treec7f9cccafcc21e241abdecde6f5219ab1009aff6
parentFormat fix. (diff)
downloadzen-dc6becffb513280170958f94e18c1b2966ade4d1.tar.xz
zen-dc6becffb513280170958f94e18c1b2966ade4d1.zip
Refactored upstream cache to better handle different states in prep for dynamic auth tokens.
-rw-r--r--zenserver/auth/authservice.cpp37
-rw-r--r--zenserver/auth/authservice.h22
-rw-r--r--zenserver/cache/structuredcache.cpp51
-rw-r--r--zenserver/cache/structuredcache.h32
-rw-r--r--zenserver/config.cpp7
-rw-r--r--zenserver/config.h1
-rw-r--r--zenserver/upstream/jupiter.h1
-rw-r--r--zenserver/upstream/upstream.h8
-rw-r--r--zenserver/upstream/upstreamcache.cpp690
-rw-r--r--zenserver/upstream/upstreamcache.h82
-rw-r--r--zenserver/upstream/upstreamservice.cpp208
-rw-r--r--zenserver/upstream/upstreamservice.h25
-rw-r--r--zenserver/upstream/zen.h1
-rw-r--r--zenserver/zenserver.cpp78
14 files changed, 804 insertions, 439 deletions
diff --git a/zenserver/auth/authservice.cpp b/zenserver/auth/authservice.cpp
new file mode 100644
index 000000000..c6def15b4
--- /dev/null
+++ b/zenserver/auth/authservice.cpp
@@ -0,0 +1,37 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <auth/authservice.h>
+#include <zencore/string.h>
+
+namespace zen {
+
+using namespace std::literals;
+
+HttpAuthService::HttpAuthService()
+{
+ m_Router.RegisterRoute(
+ "token",
+ [this](HttpRouterRequest& RouterRequest) {
+ HttpServerRequest& ServerRequest = RouterRequest.ServerRequest();
+ ServerRequest.WriteResponse(HttpResponseCode::OK);
+ },
+ HttpVerb::kPost);
+}
+
+HttpAuthService::~HttpAuthService()
+{
+}
+
+const char*
+HttpAuthService::BaseUri() const
+{
+ return "/auth/";
+}
+
+void
+HttpAuthService::HandleRequest(zen::HttpServerRequest& Request)
+{
+ m_Router.HandleRequest(Request);
+}
+
+} // namespace zen
diff --git a/zenserver/auth/authservice.h b/zenserver/auth/authservice.h
new file mode 100644
index 000000000..30b2b5864
--- /dev/null
+++ b/zenserver/auth/authservice.h
@@ -0,0 +1,22 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zenhttp/httpserver.h>
+
+namespace zen {
+
+class HttpAuthService final : public zen::HttpService
+{
+public:
+ HttpAuthService();
+ virtual ~HttpAuthService();
+
+ virtual const char* BaseUri() const override;
+ virtual void HandleRequest(zen::HttpServerRequest& Request) override;
+
+private:
+ HttpRequestRouter m_Router;
+};
+
+} // namespace zen
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 2675590e0..5918d5178 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -59,17 +59,17 @@ struct AttachmentCount
//////////////////////////////////////////////////////////////////////////
-HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore,
- CidStore& InCidStore,
- HttpStatsService& StatsService,
- HttpStatusService& StatusService,
- std::unique_ptr<UpstreamCache> UpstreamCache)
+HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore,
+ CidStore& InCidStore,
+ HttpStatsService& StatsService,
+ HttpStatusService& StatusService,
+ UpstreamCache& UpstreamCache)
: m_Log(logging::Get("cache"))
, m_CacheStore(InCacheStore)
, m_StatsService(StatsService)
, m_StatusService(StatusService)
, m_CidStore(InCidStore)
-, m_UpstreamCache(std::move(UpstreamCache))
+, m_UpstreamCache(UpstreamCache)
{
m_StatsService.RegisterHandler("z$", *this);
m_StatusService.RegisterHandler("z$", *this);
@@ -206,7 +206,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
const bool SkipData = (Policy & CachePolicy::SkipData) == CachePolicy::SkipData;
const bool SkipAttachments = (Policy & CachePolicy::SkipAttachments) == CachePolicy::SkipAttachments;
const bool PartialOnError = (Policy & CachePolicy::PartialOnError) == CachePolicy::PartialOnError;
- const bool QueryUpstream = m_UpstreamCache && ((Policy & CachePolicy::QueryRemote) == CachePolicy::QueryRemote);
+ const bool QueryUpstream = (Policy & CachePolicy::QueryRemote) == CachePolicy::QueryRemote;
bool Success = false;
ZenCacheValue LocalCacheValue;
@@ -292,7 +292,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
metrics::OperationTiming::Scope $(m_UpstreamGetRequestTiming);
- if (GetUpstreamCacheResult UpstreamResult = m_UpstreamCache->GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, AcceptType);
+ if (GetUpstreamCacheResult UpstreamResult = m_UpstreamCache.GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, AcceptType);
UpstreamResult.Success)
{
Success = true;
@@ -437,7 +437,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
}
const HttpContentType ContentType = Request.RequestContentType();
- const bool StoreUpstream = m_UpstreamCache && (CachePolicy::StoreRemote == (Policy & CachePolicy::StoreRemote));
+ const bool StoreUpstream = (Policy & CachePolicy::StoreRemote) == CachePolicy::StoreRemote;
Body.SetContentType(ContentType);
@@ -448,8 +448,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
if (StoreUpstream)
{
- ZEN_ASSERT(m_UpstreamCache);
- m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kBinary, .Key = {Ref.BucketSegment, Ref.HashKey}});
+ m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kBinary, .Key = {Ref.BucketSegment, Ref.HashKey}});
}
Request.WriteResponse(HttpResponseCode::Created);
@@ -492,8 +491,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
if (StoreUpstream && !IsPartialRecord)
{
- ZEN_ASSERT(m_UpstreamCache);
- m_UpstreamCache->EnqueueUpstream(
+ m_UpstreamCache.EnqueueUpstream(
{.Type = ZenContentType::kCbObject, .Key = {Ref.BucketSegment, Ref.HashKey}, .PayloadIds = std::move(ValidAttachments)});
}
@@ -574,8 +572,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
if (StoreUpstream && !IsPartialRecord)
{
- ZEN_ASSERT(m_UpstreamCache);
- m_UpstreamCache->EnqueueUpstream(
+ m_UpstreamCache.EnqueueUpstream(
{.Type = ZenContentType::kCbPackage, .Key = {Ref.BucketSegment, Ref.HashKey}, .PayloadIds = std::move(ValidAttachments)});
}
@@ -611,11 +608,11 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques
{
IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId);
bool InUpstreamCache = false;
- const bool QueryUpstream = !Payload && m_UpstreamCache && (CachePolicy::QueryRemote == (Policy & CachePolicy::QueryRemote));
+ const bool QueryUpstream = !Payload && (Policy & CachePolicy::QueryRemote) == CachePolicy::QueryRemote;
if (QueryUpstream)
{
- if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId); UpstreamResult.Success)
+ if (auto UpstreamResult = m_UpstreamCache.GetCachePayload({Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId); UpstreamResult.Success)
{
if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value)))
{
@@ -825,7 +822,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req
const bool PartialOnError = Policy.HasRecordPolicy(CachePolicy::PartialOnError);
const bool SkipAttachments = Policy.HasRecordPolicy(CachePolicy::SkipAttachments);
- const bool QueryRemote = Policy.HasRecordPolicy(CachePolicy::QueryRemote) && m_UpstreamCache;
+ const bool QueryRemote = Policy.HasRecordPolicy(CachePolicy::QueryRemote);
for (CbFieldView KeyView : Params["CacheKeys"sv])
{
@@ -896,7 +893,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req
++KeyIndex;
}
- if (!UpstreamRequests.empty() && m_UpstreamCache)
+ if (!UpstreamRequests.empty())
{
const auto OnCacheRecordGetComplete =
[this, &CacheValues, &RpcResponse, PartialOnError, SkipAttachments](CacheRecordGetCompleteParams&& Params) {
@@ -973,7 +970,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req
}
};
- m_UpstreamCache->GetCacheRecords(CacheKeys, UpstreamRequests, Policy, std::move(OnCacheRecordGetComplete));
+ m_UpstreamCache.GetCacheRecords(CacheKeys, UpstreamRequests, Policy, std::move(OnCacheRecordGetComplete));
}
CbObjectWriter ResponseObject;
@@ -1156,7 +1153,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re
++RequestIndex;
}
- if (!UpstreamRequests.empty() && m_UpstreamCache)
+ if (!UpstreamRequests.empty())
{
const auto OnCachePayloadGetComplete = [this, &Chunks](CachePayloadGetCompleteParams&& Params) {
if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Payload)))
@@ -1183,7 +1180,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re
}
};
- m_UpstreamCache->GetCachePayloads(ChunkRequests, UpstreamRequests, std::move(OnCachePayloadGetComplete));
+ m_UpstreamCache.GetCachePayloads(ChunkRequests, UpstreamRequests, std::move(OnCachePayloadGetComplete));
}
CbPackage RpcResponse;
@@ -1242,13 +1239,9 @@ HttpStructuredCacheService::HandleStatsRequest(zen::HttpServerRequest& Request)
Cbo << "upstream_hits" << m_CacheStats.UpstreamHitCount;
Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0);
Cbo.EndObject();
-
- if (m_UpstreamCache)
- {
- Cbo.BeginObject("upstream");
- m_UpstreamCache->GetStatus(Cbo);
- Cbo.EndObject();
- }
+ Cbo.BeginObject("upstream");
+ m_UpstreamCache.GetStatus(Cbo);
+ Cbo.EndObject();
Cbo.BeginObject("cas");
Cbo.BeginObject("size");
diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h
index 9ee7da99b..1bf3940e7 100644
--- a/zenserver/cache/structuredcache.h
+++ b/zenserver/cache/structuredcache.h
@@ -55,11 +55,11 @@ enum class CachePolicy : uint32_t;
class HttpStructuredCacheService : public HttpService, public IHttpStatsProvider, public IHttpStatusProvider
{
public:
- HttpStructuredCacheService(ZenCacheStore& InCacheStore,
- CidStore& InCidStore,
- HttpStatsService& StatsService,
- HttpStatusService& StatusService,
- std::unique_ptr<UpstreamCache> UpstreamCache);
+ HttpStructuredCacheService(ZenCacheStore& InCacheStore,
+ CidStore& InCidStore,
+ HttpStatsService& StatsService,
+ HttpStatusService& StatusService,
+ UpstreamCache& UpstreamCache);
~HttpStructuredCacheService();
virtual const char* BaseUri() const override;
@@ -97,17 +97,17 @@ private:
virtual void HandleStatsRequest(zen::HttpServerRequest& Request) override;
virtual void HandleStatusRequest(zen::HttpServerRequest& Request) override;
- spdlog::logger& Log() { return m_Log; }
- spdlog::logger& m_Log;
- ZenCacheStore& m_CacheStore;
- HttpStatsService& m_StatsService;
- HttpStatusService& m_StatusService;
- CidStore& m_CidStore;
- std::unique_ptr<UpstreamCache> m_UpstreamCache;
- uint64_t m_LastScrubTime = 0;
- metrics::OperationTiming m_HttpRequests;
- metrics::OperationTiming m_UpstreamGetRequestTiming;
- CacheStats m_CacheStats;
+ spdlog::logger& Log() { return m_Log; }
+ spdlog::logger& m_Log;
+ ZenCacheStore& m_CacheStore;
+ HttpStatsService& m_StatsService;
+ HttpStatusService& m_StatusService;
+ CidStore& m_CidStore;
+ UpstreamCache& m_UpstreamCache;
+ uint64_t m_LastScrubTime = 0;
+ metrics::OperationTiming m_HttpRequests;
+ metrics::OperationTiming m_UpstreamGetRequestTiming;
+ CacheStats m_CacheStats;
};
} // namespace zen
diff --git a/zenserver/config.cpp b/zenserver/config.cpp
index da37efbb7..4afe012dd 100644
--- a/zenserver/config.cpp
+++ b/zenserver/config.cpp
@@ -271,13 +271,6 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions)
options.add_option("cache",
"",
- "upstream-stats",
- "Collect performance metrics for upstream endpoints",
- cxxopts::value<bool>(ServerOptions.UpstreamCacheConfig.StatsEnabled)->default_value("false"),
- "");
-
- options.add_option("cache",
- "",
"upstream-connect-timeout-ms",
"Connect timeout in millisecond(s). Default 5000 ms.",
cxxopts::value<int32_t>(ServerOptions.UpstreamCacheConfig.ConnectTimeoutMilliseconds)->default_value("5000"),
diff --git a/zenserver/config.h b/zenserver/config.h
index e994c248e..2c31e7bd9 100644
--- a/zenserver/config.h
+++ b/zenserver/config.h
@@ -54,7 +54,6 @@ struct ZenUpstreamCacheConfig
int32_t ConnectTimeoutMilliseconds = 5000;
int32_t TimeoutMilliseconds = 0;
UpstreamCachePolicy CachePolicy = UpstreamCachePolicy::ReadWrite;
- bool StatsEnabled = false;
};
struct ZenCacheEvictionPolicy
diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h
index 68c7361e0..bb1797393 100644
--- a/zenserver/upstream/jupiter.h
+++ b/zenserver/upstream/jupiter.h
@@ -132,6 +132,7 @@ private:
struct CloudCacheClientOptions
{
+ std::string_view Name;
std::string_view ServiceUrl;
std::string_view DdcNamespace;
std::string_view BlobStoreNamespace;
diff --git a/zenserver/upstream/upstream.h b/zenserver/upstream/upstream.h
new file mode 100644
index 000000000..a57301206
--- /dev/null
+++ b/zenserver/upstream/upstream.h
@@ -0,0 +1,8 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <upstream/jupiter.h>
+#include <upstream/upstreamcache.h>
+#include <upstream/upstreamservice.h>
+#include <upstream/zen.h>
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index 65624ef17..3d6641a4f 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -25,6 +25,7 @@
#include <algorithm>
#include <atomic>
+#include <shared_mutex>
#include <thread>
#include <unordered_map>
@@ -34,6 +35,52 @@ using namespace std::literals;
namespace detail {
+ class UpstreamStatus
+ {
+ public:
+ UpstreamEndpointState EndpointState() const { return static_cast<UpstreamEndpointState>(m_State.load(std::memory_order_relaxed)); }
+
+ UpstreamEndpointStatus EndpointStatus() const
+ {
+ const UpstreamEndpointState State = EndpointState();
+ {
+ std::unique_lock _(m_Mutex);
+ return {.Reason = m_ErrorText, .State = State};
+ }
+ }
+
+ void Set(UpstreamEndpointState NewState)
+ {
+ m_State.store(static_cast<uint32_t>(NewState), std::memory_order_relaxed);
+ {
+ std::unique_lock _(m_Mutex);
+ m_ErrorText.clear();
+ }
+ }
+
+ void Set(UpstreamEndpointState NewState, std::string ErrorText)
+ {
+ m_State.store(static_cast<uint32_t>(NewState), std::memory_order_relaxed);
+ {
+ std::unique_lock _(m_Mutex);
+ m_ErrorText = std::move(ErrorText);
+ }
+ }
+
+ void SetFromErrorCode(int32_t ErrorCode, std::string_view ErrorText)
+ {
+ if (ErrorCode != 0)
+ {
+ Set(ErrorCode == 401 ? UpstreamEndpointState::kUnauthorized : UpstreamEndpointState::kError, std::string(ErrorText));
+ }
+ }
+
+ private:
+ mutable std::mutex m_Mutex;
+ std::string m_ErrorText;
+ std::atomic_uint32_t m_State;
+ };
+
class JupiterUpstreamEndpoint final : public UpstreamEndpoint
{
public:
@@ -41,7 +88,8 @@ namespace detail {
: m_Log(zen::logging::Get("upstream"))
, m_UseLegacyDdc(Options.UseLegacyDdc)
{
- m_Info.Name = "Horde"sv;
+ ZEN_ASSERT(!Options.Name.empty());
+ m_Info.Name = Options.Name;
m_Info.Url = Options.ServiceUrl;
m_Client = new CloudCacheClient(Options);
}
@@ -50,27 +98,45 @@ namespace detail {
virtual const UpstreamEndpointInfo& GetEndpointInfo() const override { return m_Info; }
- virtual UpstreamEndpointHealth Initialize() override { return CheckHealth(); }
-
- virtual bool IsHealthy() const override { return m_HealthOk.load(); }
-
- virtual UpstreamEndpointHealth CheckHealth() override
+ virtual UpstreamEndpointStatus Initialize() override
{
try
{
+ if (m_Status.EndpointState() == UpstreamEndpointState::kOk)
+ {
+ return {.State = UpstreamEndpointState::kOk};
+ }
+
CloudCacheSession Session(m_Client);
const CloudCacheResult Result = Session.Authenticate();
- m_HealthOk = Result.Success && Result.ErrorCode == 0;
+ if (Result.Success)
+ {
+ m_Status.Set(UpstreamEndpointState::kOk);
+ }
+ else if (Result.ErrorCode != 0)
+ {
+ m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason);
+ }
+ else
+ {
+ m_Status.Set(UpstreamEndpointState::kUnauthorized);
+ }
- return {.Reason = std::move(Result.Reason), .Ok = Result.Success};
+ return m_Status.EndpointStatus();
}
catch (std::exception& Err)
{
- return {.Reason = Err.what(), .Ok = false};
+ m_Status.Set(UpstreamEndpointState::kError, Err.what());
+
+ return {.Reason = Err.what(), .State = GetState()};
}
}
+ virtual UpstreamEndpointState GetState() override { return m_Status.EndpointState(); }
+
+ virtual UpstreamEndpointStatus GetStatus() override { return m_Status.EndpointStatus(); }
+
virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override
{
ZEN_TRACE_CPU("Upstream::Horde::GetSingleCacheRecord");
@@ -127,6 +193,8 @@ namespace detail {
}
}
+ m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason);
+
if (Result.ErrorCode == 0)
{
return {.Value = Result.Response,
@@ -136,13 +204,13 @@ namespace detail {
}
else
{
- m_HealthOk = false;
return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
}
}
catch (std::exception& Err)
{
- m_HealthOk = false;
+ m_Status.Set(UpstreamEndpointState::kError, Err.what());
+
return {.Error{.ErrorCode = -1, .Reason = Err.what()}};
}
}
@@ -170,6 +238,8 @@ namespace detail {
CloudCacheResult RefResult = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject);
AppendResult(RefResult, Result);
+ m_Status.SetFromErrorCode(RefResult.ErrorCode, RefResult.Reason);
+
if (RefResult.ErrorCode == 0)
{
const CbValidateError ValidationResult = ValidateCompactBinary(RefResult.Response, CbValidateMode::All);
@@ -180,6 +250,8 @@ namespace detail {
CloudCacheResult BlobResult = Session.GetCompressedBlob(AttachmentHash.AsHash());
AppendResult(BlobResult, Result);
+ m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason);
+
if (BlobResult.ErrorCode == 0)
{
if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(BlobResult.Response)))
@@ -187,17 +259,9 @@ namespace detail {
Package.AddAttachment(CbAttachment(Chunk));
}
}
- else
- {
- m_HealthOk = false;
- }
});
}
}
- else
- {
- m_HealthOk = false;
- }
}
OnComplete({.Key = CacheKey, .KeyIndex = Index, .Record = Record, .Package = Package});
@@ -215,6 +279,8 @@ namespace detail {
CloudCacheSession Session(m_Client);
const CloudCacheResult Result = Session.GetCompressedBlob(PayloadId);
+ m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason);
+
if (Result.ErrorCode == 0)
{
return {.Value = Result.Response,
@@ -224,13 +290,13 @@ namespace detail {
}
else
{
- m_HealthOk = false;
return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
}
}
catch (std::exception& Err)
{
- m_HealthOk = false;
+ m_Status.Set(UpstreamEndpointState::kError, Err.what());
+
return {.Error{.ErrorCode = -1, .Reason = Err.what()}};
}
}
@@ -255,7 +321,8 @@ namespace detail {
Payload = BlobResult.Response;
AppendResult(BlobResult, Result);
- m_HealthOk = BlobResult.ErrorCode == 0;
+
+ m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason);
}
OnComplete({.Request = Request, .RequestIndex = Index, .Payload = Payload});
@@ -292,7 +359,7 @@ namespace detail {
}
}
- m_HealthOk = Result.ErrorCode == 0;
+ m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason);
return {.Reason = std::move(Result.Reason),
.Bytes = Result.Bytes,
@@ -323,7 +390,7 @@ namespace detail {
BlobResult = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]);
}
- m_HealthOk = BlobResult.ErrorCode == 0;
+ m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason);
if (!BlobResult.Success)
{
@@ -344,7 +411,7 @@ namespace detail {
RefResult = Session.PutRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue, ZenContentType::kCbObject);
}
- m_HealthOk = RefResult.ErrorCode == 0;
+ m_Status.SetFromErrorCode(RefResult.ErrorCode, RefResult.Reason);
if (!RefResult.Success)
{
@@ -366,7 +433,8 @@ namespace detail {
const IoHash RefHash = IoHash::HashBuffer(RecordValue);
FinalizeRefResult FinalizeResult = Session.FinalizeRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RefHash);
- m_HealthOk = FinalizeResult.ErrorCode == 0;
+
+ m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason);
if (!FinalizeResult.Success)
{
@@ -385,7 +453,8 @@ namespace detail {
}
FinalizeResult = Session.FinalizeRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RefHash);
- m_HealthOk = FinalizeResult.ErrorCode == 0;
+
+ m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason);
if (!FinalizeResult.Success)
{
@@ -420,7 +489,8 @@ namespace detail {
}
catch (std::exception& Err)
{
- m_HealthOk = false;
+ m_Status.Set(UpstreamEndpointState::kError, Err.what());
+
return {.Reason = std::string(Err.what()), .Success = false};
}
}
@@ -444,11 +514,10 @@ namespace detail {
spdlog::logger& m_Log;
UpstreamEndpointInfo m_Info;
+ UpstreamStatus m_Status;
+ UpstreamEndpointStats m_Stats;
bool m_UseLegacyDdc;
- std::string m_DisplayName;
RefPtr<CloudCacheClient> m_Client;
- UpstreamEndpointStats m_Stats;
- std::atomic_bool m_HealthOk{false};
};
class ZenUpstreamEndpoint final : public UpstreamEndpoint
@@ -466,7 +535,7 @@ namespace detail {
public:
ZenUpstreamEndpoint(const ZenStructuredCacheClientOptions& Options)
: m_Log(zen::logging::Get("upstream"))
- , m_Info({.Name = std::string("Zen")})
+ , m_Info({.Name = std::string(Options.Name)})
, m_ConnectTimeout(Options.ConnectTimeout)
, m_Timeout(Options.Timeout)
{
@@ -480,62 +549,43 @@ namespace detail {
virtual const UpstreamEndpointInfo& GetEndpointInfo() const override { return m_Info; }
- virtual UpstreamEndpointHealth Initialize() override
- {
- const ZenEndpoint& Ep = GetEndpoint();
- if (Ep.Ok)
- {
- m_Info.Url = Ep.Url;
- m_Client = new ZenStructuredCacheClient({.Url = m_Info.Url, .ConnectTimeout = m_ConnectTimeout, .Timeout = m_Timeout});
-
- m_HealthOk = true;
- return {.Ok = true};
- }
-
- m_HealthOk = false;
- return {.Reason = Ep.Reason};
- }
-
- virtual bool IsHealthy() const override { return m_HealthOk; }
-
- virtual UpstreamEndpointHealth CheckHealth() override
+ virtual UpstreamEndpointStatus Initialize() override
{
try
{
- if (m_Client.IsNull())
+ if (m_Status.EndpointState() == UpstreamEndpointState::kOk)
{
- const ZenEndpoint& Ep = GetEndpoint();
- if (Ep.Ok)
- {
- m_Info.Url = Ep.Url;
- m_Client =
- new ZenStructuredCacheClient({.Url = m_Info.Url, .ConnectTimeout = m_ConnectTimeout, .Timeout = m_Timeout});
-
- m_HealthOk = true;
- return {.Ok = true};
- }
-
- return {.Reason = Ep.Reason};
+ return {.State = UpstreamEndpointState::kOk};
}
- ZenStructuredCacheSession Session(*m_Client);
- ZenCacheResult Result;
+ const ZenEndpoint& Ep = GetEndpoint();
- for (int32_t Attempt = 0, MaxAttempts = 2; Attempt < MaxAttempts && !Result.Success; ++Attempt)
+ m_Info.Url = Ep.Url;
+
+ if (Ep.Ok)
{
- Result = Session.CheckHealth();
+ m_Client = new ZenStructuredCacheClient({.Url = m_Info.Url, .ConnectTimeout = m_ConnectTimeout, .Timeout = m_Timeout});
+ m_Status.Set(UpstreamEndpointState::kOk);
+ }
+ else
+ {
+ m_Status.Set(UpstreamEndpointState::kError, Ep.Reason);
}
- m_HealthOk = Result.ErrorCode == 0;
-
- return {.Reason = std::move(Result.Reason), .Ok = m_HealthOk};
+ return m_Status.EndpointStatus();
}
catch (std::exception& Err)
{
- return {.Reason = Err.what(), .Ok = false};
+ m_Status.Set(UpstreamEndpointState::kError, Err.what());
+
+ return {.Reason = Err.what(), .State = GetState()};
}
}
+ virtual UpstreamEndpointState GetState() override { return m_Status.EndpointState(); }
+
+ virtual UpstreamEndpointStatus GetStatus() override { return m_Status.EndpointStatus(); }
+
virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override
{
ZEN_TRACE_CPU("Upstream::Zen::GetSingleCacheRecord");
@@ -545,6 +595,8 @@ namespace detail {
ZenStructuredCacheSession Session(*m_Client);
const ZenCacheResult Result = Session.GetCacheRecord(CacheKey.Bucket, CacheKey.Hash, Type);
+ m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason);
+
if (Result.ErrorCode == 0)
{
return {.Value = Result.Response,
@@ -554,13 +606,13 @@ namespace detail {
}
else
{
- m_HealthOk = false;
return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
}
}
catch (std::exception& Err)
{
- m_HealthOk = false;
+ m_Status.Set(UpstreamEndpointState::kError, Err.what());
+
return {.Error{.ErrorCode = -1, .Reason = Err.what()}};
}
}
@@ -608,6 +660,8 @@ namespace detail {
Result = Session.InvokeRpc(BatchRequest.Save());
}
+ m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason);
+
if (Result.Success)
{
if (BatchResponse.TryLoad(Result.Response))
@@ -621,10 +675,6 @@ namespace detail {
return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true};
}
}
- else if (Result.ErrorCode)
- {
- m_HealthOk = false;
- }
for (size_t Index : KeyIndex)
{
@@ -643,6 +693,8 @@ namespace detail {
ZenStructuredCacheSession Session(*m_Client);
const ZenCacheResult Result = Session.GetCachePayload(CacheKey.Bucket, CacheKey.Hash, PayloadId);
+ m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason);
+
if (Result.ErrorCode == 0)
{
return {.Value = Result.Response,
@@ -652,13 +704,13 @@ namespace detail {
}
else
{
- m_HealthOk = false;
return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
}
}
catch (std::exception& Err)
{
- m_HealthOk = false;
+ m_Status.Set(UpstreamEndpointState::kError, Err.what());
+
return {.Error{.ErrorCode = -1, .Reason = Err.what()}};
}
}
@@ -713,6 +765,8 @@ namespace detail {
Result = Session.InvokeRpc(BatchRequest.Save());
}
+ m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason);
+
if (Result.Success)
{
if (BatchResponse.TryLoad(Result.Response))
@@ -736,10 +790,6 @@ namespace detail {
return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true};
}
}
- else if (Result.ErrorCode)
- {
- m_HealthOk = false;
- }
for (size_t Index : RequestIndex)
{
@@ -789,10 +839,10 @@ namespace detail {
for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
{
Result = Session.PutCacheRecord(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, PackagePayload, CacheRecord.Type);
-
- m_HealthOk = Result.ErrorCode == 0;
}
+ m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason);
+
TotalBytes = Result.Bytes;
TotalElapsedSeconds = Result.ElapsedSeconds;
}
@@ -807,10 +857,10 @@ namespace detail {
CacheRecord.Key.Hash,
CacheRecord.PayloadIds[Idx],
Payloads[Idx]);
-
- m_HealthOk = Result.ErrorCode == 0;
}
+ m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason);
+
TotalBytes += Result.Bytes;
TotalElapsedSeconds += Result.ElapsedSeconds;
@@ -827,10 +877,10 @@ namespace detail {
for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
{
Result = Session.PutCacheRecord(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue, CacheRecord.Type);
-
- m_HealthOk = Result.ErrorCode == 0;
}
+ m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason);
+
TotalBytes += Result.Bytes;
TotalElapsedSeconds += Result.ElapsedSeconds;
}
@@ -842,7 +892,8 @@ namespace detail {
}
catch (std::exception& Err)
{
- m_HealthOk = false;
+ m_Status.Set(UpstreamEndpointState::kError, Err.what());
+
return {.Reason = std::string(Err.what()), .Success = false};
}
}
@@ -885,109 +936,18 @@ namespace detail {
spdlog::logger& m_Log;
UpstreamEndpointInfo m_Info;
+ UpstreamStatus m_Status;
+ UpstreamEndpointStats m_Stats;
std::vector<ZenEndpoint> m_Endpoints;
std::chrono::milliseconds m_ConnectTimeout;
std::chrono::milliseconds m_Timeout;
RefPtr<ZenStructuredCacheClient> m_Client;
- UpstreamEndpointStats m_Stats;
- std::atomic_bool m_HealthOk{false};
};
} // namespace detail
//////////////////////////////////////////////////////////////////////////
-struct UpstreamStats
-{
- static constexpr uint64_t MaxSampleCount = 1000ull;
-
- UpstreamStats(bool Enabled) : m_Enabled(Enabled) {}
-
- void Add(spdlog::logger& Logger,
- UpstreamEndpoint& Endpoint,
- const GetUpstreamCacheResult& Result,
- const std::vector<std::unique_ptr<UpstreamEndpoint>>& Endpoints)
- {
- UpstreamEndpointStats& Stats = Endpoint.Stats();
-
- if (Result.Error)
- {
- Stats.ErrorCount++;
- }
- else if (Result.Success)
- {
- Stats.HitCount++;
- Stats.DownBytes.fetch_add(Result.Bytes);
- Stats.TimeDownMs.fetch_add(uint64_t(Result.ElapsedSeconds * 1000.0));
- }
- else
- {
- Stats.MissCount++;
- }
-
- if (m_Enabled && m_SampleCount++ % MaxSampleCount)
- {
- Dump(Logger, Endpoints);
- }
- }
-
- void Add(spdlog::logger& Logger,
- UpstreamEndpoint& Endpoint,
- const PutUpstreamCacheResult& Result,
- const std::vector<std::unique_ptr<UpstreamEndpoint>>& Endpoints)
- {
- UpstreamEndpointStats& Stats = Endpoint.Stats();
- if (Result.Success)
- {
- Stats.UpCount++;
- Stats.UpBytes.fetch_add(Result.Bytes);
- Stats.TimeUpMs.fetch_add(uint64_t(Result.ElapsedSeconds * 1000.0));
- }
- else
- {
- Stats.ErrorCount++;
- }
-
- if (m_Enabled && m_SampleCount++ % MaxSampleCount)
- {
- Dump(Logger, Endpoints);
- }
- }
-
- void Dump(spdlog::logger& Logger, const std::vector<std::unique_ptr<UpstreamEndpoint>>& Endpoints)
- {
- for (auto& Ep : Endpoints)
- {
- // These stats will not be totally correct as the numbers are not captured atomically
- UpstreamEndpointStats& Stats = Ep->Stats();
- const uint64_t HitCount = Stats.HitCount;
- const uint64_t MissCount = Stats.MissCount;
- const double DownMBytes = double(Stats.DownBytes) / 1024.0 / 1024.0;
- const double SecondsDown = double(Stats.TimeDownMs) / 1000.0;
- const double UpMBytes = double(Stats.UpBytes) / 1024.0 / 1024.0;
- const double SecondsUp = double(Stats.TimeUpMs) / 1000.0;
-
- const double UpSpeed = UpMBytes > 0 ? UpMBytes / SecondsUp : 0.0;
- const double DownSpeed = DownMBytes > 0 ? DownMBytes / SecondsDown : 0.0;
- const uint64_t TotalCount = HitCount + MissCount;
- const double HitRate = TotalCount > 0 ? (double(HitCount) / double(TotalCount)) : 0.0;
-
- Logger.debug("STATS - '{}', Hit rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'",
- Ep->GetEndpointInfo().Name,
- HitRate,
- DownMBytes,
- DownSpeed,
- UpMBytes,
- UpSpeed);
- }
- }
-
- bool m_Enabled;
- std::atomic_uint64_t m_SampleCount = {};
-};
-
-//////////////////////////////////////////////////////////////////////////
-
class DefaultUpstreamCache final : public UpstreamCache
{
public:
@@ -996,71 +956,87 @@ public:
, m_Options(Options)
, m_CacheStore(CacheStore)
, m_CidStore(CidStore)
- , m_Stats(Options.StatsEnabled)
{
}
virtual ~DefaultUpstreamCache() { Shutdown(); }
- virtual bool Initialize() override
+ virtual void Initialize() override
{
- for (auto& Endpoint : m_Endpoints)
+ for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++)
{
- const UpstreamEndpointHealth Health = Endpoint->Initialize();
- const UpstreamEndpointInfo& Info = Endpoint->GetEndpointInfo();
-
- if (Health.Ok)
- {
- ZEN_INFO("'{}' endpoint '{}' OK", Info.Name, Info.Url);
- }
- else
- {
- ZEN_WARN("'{}' endpoint '{}' FAILED, reason '{}'", Info.Name, Info.Url, Health.Reason);
- }
+ m_UpstreamThreads.emplace_back(&DefaultUpstreamCache::ProcessUpstreamQueue, this);
}
- m_RunState.IsRunning = !m_Endpoints.empty();
+ m_EndpointMonitorThread = std::thread(&DefaultUpstreamCache::MonitorEndpoints, this);
+ m_RunState.IsRunning = true;
+ }
+
+ virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) override
+ {
+ const UpstreamEndpointStatus Status = Endpoint->Initialize();
+ const UpstreamEndpointInfo& Info = Endpoint->GetEndpointInfo();
- if (m_RunState.IsRunning)
+ ZEN_INFO("register endpoint '{} - {}' {}", Info.Name, Info.Url, ToString(Status.State));
+
+ // Register endpoint even if it fails, the health monitor thread will probe failing endpoint(s)
+ std::unique_lock<std::shared_mutex> _(m_EndpointsMutex);
+ m_Endpoints.emplace_back(std::move(Endpoint));
+ }
+
+ virtual void IterateEndpoints(std::function<bool(UpstreamEndpoint&)>&& Fn) override
+ {
+ std::shared_lock<std::shared_mutex> _(m_EndpointsMutex);
+
+ for (auto& Ep : m_Endpoints)
{
- for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++)
+ if (!Fn(*Ep))
{
- m_UpstreamThreads.emplace_back(&DefaultUpstreamCache::ProcessUpstreamQueue, this);
+ break;
}
-
- m_EndpointMonitorThread = std::thread(&DefaultUpstreamCache::MonitorEndpoints, this);
}
-
- return m_RunState.IsRunning;
}
- virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) override { m_Endpoints.emplace_back(std::move(Endpoint)); }
-
virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override
{
ZEN_TRACE_CPU("Upstream::GetCacheRecord");
+ std::shared_lock<std::shared_mutex> _(m_EndpointsMutex);
+
if (m_Options.ReadUpstream)
{
for (auto& Endpoint : m_Endpoints)
{
- if (Endpoint->IsHealthy())
+ if (Endpoint->GetState() != UpstreamEndpointState::kOk)
{
- const GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type);
- m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints);
+ continue;
+ }
- if (Result.Success)
- {
- return Result;
- }
+ UpstreamEndpointStats& Stats = Endpoint->Stats();
+ GetUpstreamCacheResult Result;
+ {
+ metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming);
+ Result = Endpoint->GetCacheRecord(CacheKey, Type);
+ }
- if (Result.Error)
- {
- ZEN_ERROR("get cache record FAILED, endpoint '{}', reason '{}', error code '{}'",
- Endpoint->GetEndpointInfo().Url,
- Result.Error.Reason,
- Result.Error.ErrorCode);
- }
+ Stats.CacheGetCount.Increment(1);
+ Stats.CacheGetTotalBytes.Increment(Result.Bytes);
+
+ if (Result.Success)
+ {
+ Stats.CacheHitCount.Increment(1);
+
+ return Result;
+ }
+
+ if (Result.Error)
+ {
+ Stats.CacheErrorCount.Increment(1);
+
+ ZEN_ERROR("get cache record FAILED, endpoint '{}', reason '{}', error code '{}'",
+ Endpoint->GetEndpointInfo().Url,
+ Result.Error.Reason,
+ Result.Error.ErrorCode);
}
}
}
@@ -1075,42 +1051,62 @@ public:
{
ZEN_TRACE_CPU("Upstream::GetCacheRecords");
- std::vector<size_t> MissingKeys(KeyIndex.begin(), KeyIndex.end());
+ std::shared_lock<std::shared_mutex> _(m_EndpointsMutex);
+
+ std::vector<size_t> RemainingKeys(KeyIndex.begin(), KeyIndex.end());
if (m_Options.ReadUpstream)
{
for (auto& Endpoint : m_Endpoints)
{
- if (Endpoint->IsHealthy() && !MissingKeys.empty())
+ if (RemainingKeys.empty())
+ {
+ break;
+ }
+
+ if (Endpoint->GetState() != UpstreamEndpointState::kOk)
+ {
+ continue;
+ }
+
+ UpstreamEndpointStats& Stats = Endpoint->Stats();
+ std::vector<size_t> Missing;
+ GetUpstreamCacheResult Result;
{
- std::vector<size_t> Missing;
+ metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming);
- auto Result = Endpoint->GetCacheRecords(CacheKeys, MissingKeys, Policy, [&](CacheRecordGetCompleteParams&& Params) {
+ Result = Endpoint->GetCacheRecords(CacheKeys, RemainingKeys, Policy, [&](CacheRecordGetCompleteParams&& Params) {
if (Params.Record)
{
OnComplete(std::forward<CacheRecordGetCompleteParams>(Params));
+
+ Stats.CacheHitCount.Increment(1);
}
else
{
Missing.push_back(Params.KeyIndex);
}
});
+ }
- if (Result.Error)
- {
- ZEN_ERROR("get cache record(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'",
- Endpoint->GetEndpointInfo().Url,
- Result.Error.Reason,
- Result.Error.ErrorCode);
- }
+ Stats.CacheGetCount.Increment(int64_t(RemainingKeys.size()));
+ Stats.CacheGetTotalBytes.Increment(Result.Bytes);
- m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints);
- MissingKeys = std::move(Missing);
+ if (Result.Error)
+ {
+ Stats.CacheErrorCount.Increment(1);
+
+ ZEN_ERROR("get cache record(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'",
+ Endpoint->GetEndpointInfo().Url,
+ Result.Error.Reason,
+ Result.Error.ErrorCode);
}
+
+ RemainingKeys = std::move(Missing);
}
}
- for (size_t Index : MissingKeys)
+ for (size_t Index : RemainingKeys)
{
OnComplete({.Key = CacheKeys[Index], .KeyIndex = Index, .Record = CbObjectView(), .Package = CbPackage()});
}
@@ -1122,43 +1118,62 @@ public:
{
ZEN_TRACE_CPU("Upstream::GetCachePayloads");
- std::vector<size_t> MissingPayloads(RequestIndex.begin(), RequestIndex.end());
+ std::shared_lock<std::shared_mutex> _(m_EndpointsMutex);
+
+ std::vector<size_t> RemainingKeys(RequestIndex.begin(), RequestIndex.end());
if (m_Options.ReadUpstream)
{
for (auto& Endpoint : m_Endpoints)
{
- if (Endpoint->IsHealthy() && !MissingPayloads.empty())
+ if (RemainingKeys.empty())
{
- std::vector<size_t> Missing;
+ break;
+ }
- auto Result =
- Endpoint->GetCachePayloads(CacheChunkRequests, MissingPayloads, [&](CachePayloadGetCompleteParams&& Params) {
- if (Params.Payload)
- {
- OnComplete(std::forward<CachePayloadGetCompleteParams>(Params));
- }
- else
- {
- Missing.push_back(Params.RequestIndex);
- }
- });
+ if (Endpoint->GetState() != UpstreamEndpointState::kOk)
+ {
+ continue;
+ }
- if (Result.Error)
- {
- ZEN_ERROR("get cache payloads(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'",
- Endpoint->GetEndpointInfo().Url,
- Result.Error.Reason,
- Result.Error.ErrorCode);
- }
+ UpstreamEndpointStats& Stats = Endpoint->Stats();
+ std::vector<size_t> Missing;
+ GetUpstreamCacheResult Result;
+ {
+ metrics::OperationTiming::Scope Scope(Endpoint->Stats().CacheGetRequestTiming);
+
+ Result = Endpoint->GetCachePayloads(CacheChunkRequests, RemainingKeys, [&](CachePayloadGetCompleteParams&& Params) {
+ if (Params.Payload)
+ {
+ OnComplete(std::forward<CachePayloadGetCompleteParams>(Params));
+
+ Stats.CacheHitCount.Increment(1);
+ }
+ else
+ {
+ Missing.push_back(Params.RequestIndex);
+ }
+ });
+ }
- m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints);
- MissingPayloads = std::move(Missing);
+ Stats.CacheGetCount.Increment(int64_t(RemainingKeys.size()));
+ Stats.CacheGetTotalBytes.Increment(Result.Bytes);
+
+ if (Result.Error)
+ {
+ Stats.CacheErrorCount.Increment(1);
+
+ ZEN_ERROR("get cache payloads(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'",
+ Endpoint->GetEndpointInfo().Url,
+ Result.Error.Reason,
+ Result.Error.ErrorCode);
}
+
+ RemainingKeys = std::move(Missing);
}
}
- for (size_t Index : MissingPayloads)
+ for (size_t Index : RemainingKeys)
{
OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = IoBuffer()});
}
@@ -1172,23 +1187,37 @@ public:
{
for (auto& Endpoint : m_Endpoints)
{
- if (Endpoint->IsHealthy())
+ if (Endpoint->GetState() != UpstreamEndpointState::kOk)
{
- const GetUpstreamCacheResult Result = Endpoint->GetCachePayload(CacheKey, PayloadId);
- m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints);
+ continue;
+ }
- if (Result.Success)
- {
- return Result;
- }
+ UpstreamEndpointStats& Stats = Endpoint->Stats();
+ GetUpstreamCacheResult Result;
- if (Result.Error)
- {
- ZEN_ERROR("get cache payload FAILED, endpoint '{}', reason '{}', error code '{}'",
- Endpoint->GetEndpointInfo().Url,
- Result.Error.Reason,
- Result.Error.ErrorCode);
- }
+ {
+ metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming);
+ Result = Endpoint->GetCachePayload(CacheKey, PayloadId);
+ }
+
+ Stats.CacheGetCount.Increment(1);
+ Stats.CacheGetTotalBytes.Increment(Result.Bytes);
+
+ if (Result.Success)
+ {
+ Stats.CacheHitCount.Increment(1);
+
+ return Result;
+ }
+
+ if (Result.Error)
+ {
+ Stats.CacheErrorCount.Increment(1);
+
+ ZEN_ERROR("get cache payload FAILED, endpoint '{}', reason '{}', error code '{}'",
+ Endpoint->GetEndpointInfo().Url,
+ Result.Error.Reason,
+ Result.Error.ErrorCode);
}
}
}
@@ -1196,7 +1225,7 @@ public:
return {};
}
- virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) override
+ virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) override
{
if (m_RunState.IsRunning && m_Options.WriteUpstream)
{
@@ -1208,11 +1237,7 @@ public:
{
ProcessCacheRecord(std::move(CacheRecord));
}
-
- return {.Success = true};
}
-
- return {};
}
virtual void GetStatus(CbObjectWriter& Status) override
@@ -1225,22 +1250,35 @@ public:
Status.BeginArray("endpoints");
for (const auto& Ep : m_Endpoints)
{
- const UpstreamEndpointInfo& Info = Ep->GetEndpointInfo();
- Status.BeginObject();
- Status << "name" << Info.Name;
- Status << "url" << Info.Url;
- Status << "health" << (Ep->IsHealthy() ? "ok"sv : "inactive"sv);
+ const UpstreamEndpointInfo& EpInfo = Ep->GetEndpointInfo();
+ const UpstreamEndpointStatus EpStatus = Ep->GetStatus();
+ UpstreamEndpointStats& EpStats = Ep->Stats();
- UpstreamEndpointStats& Stats = Ep->Stats();
- const uint64_t HitCount = Stats.HitCount;
- const uint64_t MissCount = Stats.MissCount;
- const uint64_t TotalCount = HitCount + MissCount;
- const double HitRate = TotalCount > 0 ? (double(HitCount) / double(TotalCount)) : 0.0;
+ Status.BeginObject();
+ Status << "name" << EpInfo.Name;
+ Status << "url" << EpInfo.Url;
+ Status << "state" << ToString(EpStatus.State);
+ Status << "reason" << EpStatus.Reason;
- Status << "hit_ratio" << HitRate;
- Status << "downloaded_mb" << (double(Stats.DownBytes) / 1024.0 / 1024.0);
- Status << "uploaded_mb" << Stats.UpBytes;
- Status << "error_count" << Stats.ErrorCount;
+ Status.BeginObject("cache"sv);
+ {
+ const int64_t GetCount = EpStats.CacheGetCount.Value();
+ const int64_t HitCount = EpStats.CacheHitCount.Value();
+ const int64_t ErrorCount = EpStats.CacheErrorCount.Value();
+ const double HitRatio = GetCount > 0 ? double(HitCount) / double(GetCount) : 0.0;
+ const double ErrorRatio = GetCount > 0 ? double(ErrorCount) / double(GetCount) : 0.0;
+
+ metrics::EmitSnapshot("get_requests"sv, EpStats.CacheGetRequestTiming, Status);
+ Status << "get_bytes" << EpStats.CacheGetTotalBytes.Value();
+ Status << "get_count" << GetCount;
+ Status << "hit_count" << HitCount;
+ Status << "hit_ratio" << HitRatio;
+ Status << "error_count" << ErrorCount;
+ Status << "error_ratio" << ErrorRatio;
+ metrics::EmitSnapshot("put_requests"sv, EpStats.CachePutRequestTiming, Status);
+ Status << "put_bytes" << EpStats.CachePutTotalBytes.Value();
+ }
+ Status.EndObject();
Status.EndObject();
}
@@ -1277,21 +1315,31 @@ private:
}
}
+ std::shared_lock<std::shared_mutex> _(m_EndpointsMutex);
+
for (auto& Endpoint : m_Endpoints)
{
- if (Endpoint->IsHealthy())
+ if (Endpoint->GetState() != UpstreamEndpointState::kOk)
{
- const PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads));
- m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints);
+ continue;
+ }
- if (!Result.Success)
- {
- ZEN_WARN("upload cache record '{}/{}' FAILED, endpoint '{}', reason '{}'",
- CacheRecord.Key.Bucket,
- CacheRecord.Key.Hash,
- Endpoint->GetEndpointInfo().Url,
- Result.Reason);
- }
+ UpstreamEndpointStats& Stats = Endpoint->Stats();
+ PutUpstreamCacheResult Result;
+ {
+ metrics::OperationTiming::Scope Scope(Stats.CachePutRequestTiming);
+ Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads));
+ }
+
+ Stats.CachePutTotalBytes.Increment(Result.Bytes);
+
+ if (!Result.Success)
+ {
+ ZEN_WARN("upload cache record '{}/{}' FAILED, endpoint '{}', reason '{}'",
+ CacheRecord.Key.Bucket,
+ CacheRecord.Key.Hash,
+ Endpoint->GetEndpointInfo().Url,
+ Result.Reason);
}
}
}
@@ -1334,21 +1382,35 @@ private:
try
{
- for (auto& Endpoint : m_Endpoints)
+ std::vector<UpstreamEndpoint*> Endpoints;
+
{
- if (!Endpoint->IsHealthy())
+ std::shared_lock<std::shared_mutex> _(m_EndpointsMutex);
+
+ for (auto& Endpoint : m_Endpoints)
{
- const UpstreamEndpointInfo& Info = Endpoint->GetEndpointInfo();
- if (const UpstreamEndpointHealth Health = Endpoint->CheckHealth(); Health.Ok)
+ if (Endpoint->GetState() == UpstreamEndpointState::kError ||
+ Endpoint->GetState() == UpstreamEndpointState::kUnauthorized)
{
- ZEN_INFO("health check endpoint '{} - {}' OK", Info.Name, Info.Url, Health.Reason);
- }
- else
- {
- ZEN_WARN("health check endpoint '{} - {}' FAILED, reason '{}'", Info.Name, Info.Url, Health.Reason);
+ Endpoints.push_back(Endpoint.get());
}
}
}
+
+ for (auto& Endpoint : Endpoints)
+ {
+ const UpstreamEndpointInfo& Info = Endpoint->GetEndpointInfo();
+ const UpstreamEndpointStatus Status = Endpoint->Initialize();
+
+ if (Status.State == UpstreamEndpointState::kOk)
+ {
+ ZEN_INFO("health check endpoint '{} - {}' OK", Info.Name, Info.Url);
+ }
+ else
+ {
+ ZEN_WARN("health check endpoint '{} - {}' FAILED, reason '{}'", Info.Name, Info.Url, Status.Reason);
+ }
+ }
}
catch (std::exception& Err)
{
@@ -1403,7 +1465,7 @@ private:
ZenCacheStore& m_CacheStore;
CidStore& m_CidStore;
UpstreamQueue m_UpstreamQueue;
- UpstreamStats m_Stats;
+ std::shared_mutex m_EndpointsMutex;
std::vector<std::unique_ptr<UpstreamEndpoint>> m_Endpoints;
std::vector<std::thread> m_UpstreamThreads;
std::thread m_EndpointMonitorThread;
diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h
index 20c89a574..16d8c7929 100644
--- a/zenserver/upstream/upstreamcache.h
+++ b/zenserver/upstream/upstreamcache.h
@@ -4,6 +4,7 @@
#include <zencore/iobuffer.h>
#include <zencore/iohash.h>
+#include <zencore/stats.h>
#include <zencore/zencore.h>
#include <zenutil/cache/cache.h>
@@ -35,7 +36,6 @@ struct UpstreamCacheOptions
uint32_t ThreadCount = 4;
bool ReadUpstream = true;
bool WriteUpstream = true;
- bool StatsEnabled = false;
};
struct UpstreamError
@@ -63,24 +63,6 @@ struct PutUpstreamCacheResult
bool Success = false;
};
-struct UpstreamEndpointHealth
-{
- std::string Reason;
- bool Ok = false;
-};
-
-struct UpstreamEndpointStats
-{
- std::atomic_uint64_t HitCount{};
- std::atomic_uint64_t MissCount{};
- std::atomic_uint64_t UpCount{};
- std::atomic_uint64_t ErrorCount{};
- std::atomic_uint64_t UpBytes{};
- std::atomic_uint64_t DownBytes{};
- std::atomic_uint64_t TimeUpMs{};
- std::atomic_uint64_t TimeDownMs{};
-};
-
struct CacheRecordGetCompleteParams
{
const CacheKey& Key;
@@ -100,6 +82,51 @@ struct CachePayloadGetCompleteParams
using OnCachePayloadGetComplete = std::function<void(CachePayloadGetCompleteParams&&)>;
+struct UpstreamEndpointStats
+{
+ metrics::OperationTiming CacheGetRequestTiming;
+ metrics::OperationTiming CachePutRequestTiming;
+ metrics::Counter CacheGetTotalBytes;
+ metrics::Counter CachePutTotalBytes;
+ metrics::Counter CacheGetCount;
+ metrics::Counter CacheHitCount;
+ metrics::Counter CacheErrorCount;
+};
+
+enum class UpstreamEndpointState : uint32_t
+{
+ kDisabled,
+ kUnauthorized,
+ kError,
+ kOk
+};
+
+inline std::string_view
+ToString(UpstreamEndpointState State)
+{
+ using namespace std::literals;
+
+ switch (State)
+ {
+ case UpstreamEndpointState::kDisabled:
+ return "Disabled"sv;
+ case UpstreamEndpointState::kUnauthorized:
+ return "Unauthorized"sv;
+ case UpstreamEndpointState::kError:
+ return "Error"sv;
+ case UpstreamEndpointState::kOk:
+ return "Ok"sv;
+ default:
+ return "Unknown"sv;
+ }
+}
+
+struct UpstreamEndpointStatus
+{
+ std::string Reason;
+ UpstreamEndpointState State;
+};
+
struct UpstreamEndpointInfo
{
std::string Name;
@@ -116,11 +143,11 @@ public:
virtual const UpstreamEndpointInfo& GetEndpointInfo() const = 0;
- virtual UpstreamEndpointHealth Initialize() = 0;
+ virtual UpstreamEndpointStatus Initialize() = 0;
- virtual bool IsHealthy() const = 0;
+ virtual UpstreamEndpointState GetState() = 0;
- virtual UpstreamEndpointHealth CheckHealth() = 0;
+ virtual UpstreamEndpointStatus GetStatus() = 0;
virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) = 0;
@@ -150,10 +177,12 @@ class UpstreamCache
public:
virtual ~UpstreamCache() = default;
- virtual bool Initialize() = 0;
+ virtual void Initialize() = 0;
virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) = 0;
+ virtual void IterateEndpoints(std::function<bool(UpstreamEndpoint&)>&& Fn) = 0;
+
virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) = 0;
virtual void GetCacheRecords(std::span<CacheKey> CacheKeys,
@@ -167,12 +196,7 @@ public:
std::span<size_t> RequestIndex,
OnCachePayloadGetComplete&& OnComplete) = 0;
- struct EnqueueResult
- {
- bool Success = false;
- };
-
- virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0;
+ virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0;
virtual void GetStatus(CbObjectWriter& CbO) = 0;
};
diff --git a/zenserver/upstream/upstreamservice.cpp b/zenserver/upstream/upstreamservice.cpp
new file mode 100644
index 000000000..1cfd1df85
--- /dev/null
+++ b/zenserver/upstream/upstreamservice.cpp
@@ -0,0 +1,208 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <upstream/jupiter.h>
+#include <upstream/upstreamcache.h>
+#include <upstream/upstreamservice.h>
+#include <upstream/zen.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/string.h>
+
+#include <json11.hpp>
+
+namespace zen {
+
+using namespace std::literals;
+
+namespace {
+ json11::Json TryGetJson(IoBuffer Body, HttpContentType ContentType, std::string& OutError)
+ {
+ if (!Body)
+ {
+ OutError = "No data"sv;
+ return json11::Json();
+ }
+
+ if ((ContentType == HttpContentType::kJSON || ContentType == HttpContentType::kCbObject) == false)
+ {
+ OutError = "Invalid content type"sv;
+ return json11::Json();
+ }
+
+ if (ContentType == ZenContentType::kJSON)
+ {
+ std::string JsonText(reinterpret_cast<const char*>(Body.GetData()), Body.GetSize());
+ return json11::Json::parse(JsonText, OutError);
+ }
+
+ if (CbObject Obj = LoadCompactBinaryObject(Body))
+ {
+ ExtendableStringBuilder<512> Sb;
+ return json11::Json::parse(Obj.ToJson(Sb).ToString(), OutError);
+ }
+
+ OutError = "Invalid compact binary"sv;
+ return json11::Json();
+ }
+
+ void WriteErrorResponse(HttpServerRequest& Request, std::string_view Property, std::string_view Reason)
+ {
+ CbObjectWriter Response;
+ Response << "Result"sv << false;
+ Response.BeginObject("Error"sv);
+ Response << "Property"sv << Property << "Reason"sv << Reason;
+ Response.EndObject();
+
+ Request.WriteResponse(HttpResponseCode::BadRequest, Response.Save());
+ }
+
+ void WriteSuccessResponse(HttpServerRequest& Request)
+ {
+ CbObjectWriter Response;
+ Response << "Result"sv << true;
+
+ Request.WriteResponse(HttpResponseCode::OK, Response.Save());
+ }
+} // namespace
+
+HttpUpstreamService::HttpUpstreamService(UpstreamCache& Upstream) : m_Upstream(Upstream)
+{
+ m_Router.RegisterRoute(
+ "endpoints",
+ [this](HttpRouterRequest& Req) {
+ CbObjectWriter Writer;
+ Writer.BeginArray("Endpoints"sv);
+ m_Upstream.IterateEndpoints([&Writer](UpstreamEndpoint& Ep) {
+ UpstreamEndpointInfo Info = Ep.GetEndpointInfo();
+ UpstreamEndpointStatus Status = Ep.GetStatus();
+
+ Writer.BeginObject();
+ Writer << "Name"sv << Info.Name;
+ Writer << "Url"sv << Info.Url;
+ Writer << "State"sv << ToString(Status.State);
+ Writer << "Reason"sv << Status.Reason;
+ Writer.EndObject();
+
+ return true;
+ });
+ Writer.EndArray();
+ Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Writer.Save());
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "endpoints",
+ [this](HttpRouterRequest& RouterRequest) {
+ HttpServerRequest& ServerRequest = RouterRequest.ServerRequest();
+ std::string JsonError;
+
+ json11::Json Json = TryGetJson(ServerRequest.ReadPayload(), ServerRequest.RequestContentType(), JsonError);
+
+ if (!JsonError.empty())
+ {
+ return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, JsonError);
+ }
+
+ const auto Type = Json["Type"].string_value();
+ const auto Name = Json["Name"].string_value();
+ const auto Url = Json["Url"].string_value();
+ const auto Namespace = Json["Namespace"].string_value();
+ const auto OAuthProvider = Json["OAuthProvider"].string_value();
+ const auto OAuthClientId = Json["OAuthClientId"].string_value();
+ const auto OAuthSecret = Json["OAuthSecret"].string_value();
+ const auto OAuthToken = Json["OAuthToken"].string_value();
+
+ if ((Type == "Horde"sv || Type == "Zen"sv) == false)
+ {
+ return WriteErrorResponse(ServerRequest, "Type"sv, "Invalid endpoint type, must be Zen or Horde"sv);
+ }
+
+ if (Name.empty())
+ {
+ return WriteErrorResponse(ServerRequest, "Name"sv, "Invalid endpoint name"sv);
+ }
+
+ if (Url.empty())
+ {
+ return WriteErrorResponse(ServerRequest, "Url"sv, "Invalid endpoint URL"sv);
+ }
+
+ bool IsNameUnique = true;
+ m_Upstream.IterateEndpoints([&Name, &IsNameUnique](UpstreamEndpoint& Ep) {
+ IsNameUnique = IsNameUnique && Ep.GetEndpointInfo().Name != Name;
+ return IsNameUnique;
+ });
+
+ if (IsNameUnique == false)
+ {
+ return WriteErrorResponse(ServerRequest, "Url"sv, "Endpoint name is not unique"sv);
+ }
+
+ std::unique_ptr<zen::UpstreamEndpoint> Endpoint;
+
+ if (Type == "Zen"sv)
+ {
+ std::vector<std::string> Urls;
+ Urls.push_back(Json["Url"].string_value());
+ Endpoint = zen::MakeZenUpstreamEndpoint({.Name = Name, .Urls = Urls});
+ }
+ else
+ {
+ if (Namespace.empty())
+ {
+ return WriteErrorResponse(ServerRequest, "Namespace"sv, "Invalid Horde namespace"sv);
+ }
+
+ if (OAuthProvider.empty())
+ {
+ return WriteErrorResponse(ServerRequest, "OAuthProvider"sv, "Invalid Horde OAuth provider URL"sv);
+ }
+
+ if (OAuthToken.empty())
+ {
+ if (OAuthClientId.empty())
+ {
+ return WriteErrorResponse(ServerRequest, "OAuthClientId"sv, "Invalid Horde OAuth client ID"sv);
+ }
+
+ if (OAuthSecret.empty())
+ {
+ return WriteErrorResponse(ServerRequest, "OAuthSecret"sv, "Invalid Horde OAuth secret"sv);
+ }
+ }
+
+ const zen::CloudCacheClientOptions Options = {.Name = Name,
+ .ServiceUrl = Url,
+ .DdcNamespace = Namespace,
+ .BlobStoreNamespace = Namespace,
+ .OAuthProvider = OAuthProvider,
+ .OAuthClientId = OAuthClientId,
+ .OAuthSecret = OAuthSecret,
+ .AccessToken = OAuthToken};
+
+ Endpoint = zen::MakeJupiterUpstreamEndpoint(Options);
+ }
+
+ m_Upstream.RegisterEndpoint(std::move(Endpoint));
+
+ WriteSuccessResponse(ServerRequest);
+ },
+ HttpVerb::kPost);
+}
+
+HttpUpstreamService::~HttpUpstreamService()
+{
+}
+
+const char*
+HttpUpstreamService::BaseUri() const
+{
+ return "/upstream/";
+}
+
+void
+HttpUpstreamService::HandleRequest(zen::HttpServerRequest& Request)
+{
+ m_Router.HandleRequest(Request);
+}
+
+} // namespace zen
diff --git a/zenserver/upstream/upstreamservice.h b/zenserver/upstream/upstreamservice.h
new file mode 100644
index 000000000..0a42198c2
--- /dev/null
+++ b/zenserver/upstream/upstreamservice.h
@@ -0,0 +1,25 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zenhttp/httpserver.h>
+
+namespace zen {
+
+class UpstreamCache;
+
+class HttpUpstreamService final : public zen::HttpService
+{
+public:
+ HttpUpstreamService(UpstreamCache& Upstream);
+ virtual ~HttpUpstreamService();
+
+ virtual const char* BaseUri() const override;
+ virtual void HandleRequest(zen::HttpServerRequest& Request) override;
+
+private:
+ HttpRequestRouter m_Router;
+ UpstreamCache& m_Upstream;
+};
+
+} // namespace zen
diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h
index e97ce755d..c2be2165a 100644
--- a/zenserver/upstream/zen.h
+++ b/zenserver/upstream/zen.h
@@ -102,6 +102,7 @@ struct ZenCacheResult
struct ZenStructuredCacheClientOptions
{
+ std::string_view Name;
std::string_view Url;
std::span<std::string const> Urls;
std::chrono::milliseconds ConnectTimeout{};
diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp
index 60691222b..b8648c8ee 100644
--- a/zenserver/zenserver.cpp
+++ b/zenserver/zenserver.cpp
@@ -99,6 +99,7 @@ ZEN_THIRD_PARTY_INCLUDES_END
//
#include "admin/admin.h"
+#include "auth/authservice.h"
#include "cache/structuredcache.h"
#include "cache/structuredcachestore.h"
#include "compute/apply.h"
@@ -110,9 +111,7 @@ ZEN_THIRD_PARTY_INCLUDES_END
#include "projectstore.h"
#include "testing/httptest.h"
#include "testing/launch.h"
-#include "upstream/jupiter.h"
-#include "upstream/upstreamcache.h"
-#include "upstream/zen.h"
+#include "upstream/upstream.h"
#include "zenstore/gc.h"
#include "zenstore/scrub.h"
@@ -203,6 +202,10 @@ public:
m_Http = zen::CreateHttpServer(ServerOptions.HttpServerClass);
m_Http->Initialize(ServerOptions.BasePort);
+
+ m_AuthService = std::make_unique<zen::HttpAuthService>();
+ m_Http->RegisterService(*m_AuthService);
+
m_Http->RegisterService(m_HealthService);
m_Http->RegisterService(m_StatsService);
m_Http->RegisterService(m_StatusService);
@@ -278,11 +281,6 @@ public:
m_Http->RegisterService(m_CasService);
- if (m_StructuredCacheService)
- {
- m_Http->RegisterService(*m_StructuredCacheService);
- }
-
#if ZEN_WITH_COMPUTE_SERVICES
if (m_HttpLaunchService)
{
@@ -529,6 +527,7 @@ private:
}
zen::Ref<zen::HttpServer> m_Http;
+ std::unique_ptr<zen::HttpAuthService> m_AuthService;
zen::HttpStatusService m_StatusService;
zen::HttpStatsService m_StatsService;
zen::CasGc m_CasGc;
@@ -543,6 +542,8 @@ private:
zen::RefPtr<zen::ProjectStore> m_ProjectStore;
zen::Ref<zen::LocalProjectService> m_LocalProjectService;
std::unique_ptr<zen::HttpProjectService> m_HttpProjectService;
+ std::unique_ptr<zen::UpstreamCache> m_UpstreamCache;
+ std::unique_ptr<zen::HttpUpstreamService> m_UpstreamService;
std::unique_ptr<zen::HttpStructuredCacheService> m_StructuredCacheService;
zen::HttpAdminService m_AdminService{m_GcScheduler};
zen::HttpHealthService m_HealthService;
@@ -682,24 +683,23 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
ZEN_INFO("instantiating structured cache service");
m_CacheStore = std::make_unique<ZenCacheStore>(m_CasGc, m_DataRoot / "cache");
- std::unique_ptr<zen::UpstreamCache> UpstreamCache;
- if (ServerOptions.UpstreamCacheConfig.CachePolicy != UpstreamCachePolicy::Disabled)
- {
- const ZenUpstreamCacheConfig& UpstreamConfig = ServerOptions.UpstreamCacheConfig;
-
- zen::UpstreamCacheOptions UpstreamOptions;
- UpstreamOptions.ReadUpstream = (uint8_t(ServerOptions.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Read)) != 0;
- UpstreamOptions.WriteUpstream = (uint8_t(ServerOptions.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Write)) != 0;
+ const ZenUpstreamCacheConfig& UpstreamConfig = ServerOptions.UpstreamCacheConfig;
- if (UpstreamConfig.UpstreamThreadCount < 32)
- {
- UpstreamOptions.ThreadCount = static_cast<uint32_t>(UpstreamConfig.UpstreamThreadCount);
- }
+ zen::UpstreamCacheOptions UpstreamOptions;
+ UpstreamOptions.ReadUpstream = (uint8_t(ServerOptions.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Read)) != 0;
+ UpstreamOptions.WriteUpstream = (uint8_t(ServerOptions.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Write)) != 0;
- UpstreamOptions.StatsEnabled = UpstreamConfig.StatsEnabled;
+ if (UpstreamConfig.UpstreamThreadCount < 32)
+ {
+ UpstreamOptions.ThreadCount = static_cast<uint32_t>(UpstreamConfig.UpstreamThreadCount);
+ }
- UpstreamCache = zen::MakeUpstreamCache(UpstreamOptions, *m_CacheStore, *m_CidStore);
+ m_UpstreamCache = zen::MakeUpstreamCache(UpstreamOptions, *m_CacheStore, *m_CidStore);
+ m_UpstreamService = std::make_unique<HttpUpstreamService>(*m_UpstreamCache);
+ m_UpstreamCache->Initialize();
+ if (ServerOptions.UpstreamCacheConfig.CachePolicy != UpstreamCachePolicy::Disabled)
+ {
// Zen upstream
{
std::vector<std::string> ZenUrls = UpstreamConfig.ZenConfig.Urls;
@@ -723,10 +723,11 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
if (!ZenUrls.empty())
{
std::unique_ptr<zen::UpstreamEndpoint> ZenEndpoint =
- zen::MakeZenUpstreamEndpoint({.Urls = ZenUrls,
+ zen::MakeZenUpstreamEndpoint({.Name = "Zen"sv,
+ .Urls = ZenUrls,
.ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds),
.Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)});
- UpstreamCache->RegisterEndpoint(std::move(ZenEndpoint));
+ m_UpstreamCache->RegisterEndpoint(std::move(ZenEndpoint));
}
}
@@ -736,7 +737,8 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
if (UpstreamConfig.JupiterConfig.UseProductionSettings)
{
Options =
- zen::CloudCacheClientOptions{.ServiceUrl = "https://jupiter.devtools.epicgames.com"sv,
+ zen::CloudCacheClientOptions{.Name = "Horde"sv,
+ .ServiceUrl = "https://jupiter.devtools.epicgames.com"sv,
.DdcNamespace = "ue.ddc"sv,
.BlobStoreNamespace = "ue.ddc"sv,
.OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv,
@@ -749,7 +751,8 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
else if (UpstreamConfig.JupiterConfig.UseDevelopmentSettings)
{
Options =
- zen::CloudCacheClientOptions{.ServiceUrl = "https://jupiter.devtools-dev.epicgames.com"sv,
+ zen::CloudCacheClientOptions{.Name = "Horde"sv,
+ .ServiceUrl = "https://jupiter.devtools-dev.epicgames.com"sv,
.DdcNamespace = "ue.ddc"sv,
.BlobStoreNamespace = "ue.ddc"sv,
.OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv,
@@ -771,27 +774,16 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
if (!Options.ServiceUrl.empty())
{
std::unique_ptr<zen::UpstreamEndpoint> JupiterEndpoint = zen::MakeJupiterUpstreamEndpoint(Options);
- UpstreamCache->RegisterEndpoint(std::move(JupiterEndpoint));
+ m_UpstreamCache->RegisterEndpoint(std::move(JupiterEndpoint));
}
}
-
- if (UpstreamCache->Initialize())
- {
- ZEN_INFO("upstream cache active ({})",
- UpstreamOptions.ReadUpstream && UpstreamOptions.WriteUpstream ? "READ|WRITE"
- : UpstreamOptions.ReadUpstream ? "READONLY"
- : UpstreamOptions.WriteUpstream ? "WRITEONLY"
- : "DISABLED");
- }
- else
- {
- UpstreamCache.reset();
- ZEN_INFO("NOT using upstream cache");
- }
}
- m_StructuredCacheService.reset(
- new zen::HttpStructuredCacheService(*m_CacheStore, *m_CidStore, m_StatsService, m_StatusService, std::move(UpstreamCache)));
+ m_StructuredCacheService =
+ std::make_unique<HttpStructuredCacheService>(*m_CacheStore, *m_CidStore, m_StatsService, m_StatusService, *m_UpstreamCache);
+
+ m_Http->RegisterService(*m_StructuredCacheService);
+ m_Http->RegisterService(*m_UpstreamService);
}
////////////////////////////////////////////////////////////////////////////////