aboutsummaryrefslogtreecommitdiff
path: root/zenserver/upstream
diff options
context:
space:
mode:
authorMartin Ridgers <[email protected]>2021-11-22 09:13:27 +0100
committerMartin Ridgers <[email protected]>2021-11-22 09:13:27 +0100
commitbe52dec8e8c1c5d3901d9f6d742c626931abff27 (patch)
treea832df6ed494ea770fec3ee57a258d3cc9384a13 /zenserver/upstream
parentInitial integration of Trace from UE5 via the --zentrace=y xmake config (diff)
parentComment out unused variables to fix warnings. (diff)
downloadzen-be52dec8e8c1c5d3901d9f6d742c626931abff27.tar.xz
zen-be52dec8e8c1c5d3901d9f6d742c626931abff27.zip
Merged main
Diffstat (limited to 'zenserver/upstream')
-rw-r--r--zenserver/upstream/upstreamapply.cpp55
-rw-r--r--zenserver/upstream/upstreamcache.cpp85
-rw-r--r--zenserver/upstream/upstreamcache.h10
3 files changed, 102 insertions, 48 deletions
diff --git a/zenserver/upstream/upstreamapply.cpp b/zenserver/upstream/upstreamapply.cpp
index a0c6a91cf..b96adef2a 100644
--- a/zenserver/upstream/upstreamapply.cpp
+++ b/zenserver/upstream/upstreamapply.cpp
@@ -801,9 +801,42 @@ namespace detail {
{
using namespace fmt::literals;
std::string_view HostPlatform = ApplyRecord.WorkerDescriptor["host"sv].AsString();
- // TODO: Enable when Horde accepts the UE style Host Platforms (Win64, Linux, Mac)
- // CbObject Requirements = BuildRequirements("OSFamily == '{}'"_format(HostPlatform), {}, false);
- CbObject Requirements = BuildRequirements("OSFamily == 'Windows'", {}, false);
+ if (HostPlatform.empty())
+ {
+ Log().warn("process apply upstream FAILED, 'host' platform not provided");
+ return false;
+ }
+
+ int32_t LogicalCores = ApplyRecord.WorkerDescriptor["cores"sv].AsInt32();
+ int64_t Memory = ApplyRecord.WorkerDescriptor["memory"sv].AsInt64();
+ bool Exclusive = ApplyRecord.WorkerDescriptor["exclusive"sv].AsBool();
+
+ // TODO: Remove override when Horde accepts the UE style Host Platforms (Win64, Linux, Mac)
+ std::string Condition;
+ if (HostPlatform == "Win64" || HostPlatform == "Windows")
+ {
+ Condition = "OSFamily == 'Windows' && Pool == 'Win-RemoteExec'";
+ }
+ else if (HostPlatform == "Mac")
+ {
+ Condition = "OSFamily == 'MacOS'";
+ }
+ else
+ {
+ Condition = "OSFamily == '{}'"_format(HostPlatform);
+ }
+
+ std::map<std::string_view, int64_t> Resources;
+ if (LogicalCores > 0)
+ {
+ Resources["LogicalCores"sv] = LogicalCores;
+ }
+ if (Memory > 0)
+ {
+ Resources["RAM"sv] = std::max(Memory / 1024 / 1024 / 1024, 1LL);
+ }
+
+ CbObject Requirements = BuildRequirements(Condition, Resources, Exclusive);
const IoHash RequirementsId = Requirements.GetHash();
Data.Objects[RequirementsId] = std::move(Requirements);
Data.RequirementsId = RequirementsId;
@@ -957,7 +990,7 @@ namespace detail {
}
[[nodiscard]] CbObject BuildRequirements(const std::string_view Condition,
- const std::map<std::string_view, int32_t>& Resources,
+ const std::map<std::string_view, int64_t>& Resources,
const bool Exclusive)
{
CbObjectWriter Writer;
@@ -1110,11 +1143,11 @@ struct UpstreamApplyStats
UpstreamApplyEndpointStats& Stats = Ep->Stats();
const uint64_t PostCount = Stats.PostCount;
const uint64_t CompleteCount = Stats.CompleteCount;
- const uint64_t UpdateCount = Stats.UpdateCount;
- const double DownBytes = Stats.DownBytes;
- const double SecondsDown = Stats.SecondsDown;
- const double UpBytes = Stats.UpBytes;
- const double SecondsUp = Stats.SecondsUp;
+ // const uint64_t UpdateCount = Stats.UpdateCount;
+ const double DownBytes = Stats.DownBytes;
+ const double SecondsDown = Stats.SecondsDown;
+ const double UpBytes = Stats.UpBytes;
+ const double SecondsUp = Stats.SecondsUp;
const double UpSpeed = UpBytes > 0 ? UpBytes / SecondsUp : 0.0;
const double DownSpeed = DownBytes > 0 ? DownBytes / SecondsDown : 0.0;
@@ -1254,8 +1287,8 @@ public:
UpstreamApplyEndpointStats& Stats = Ep->Stats();
const uint64_t PostCount = Stats.PostCount;
const uint64_t CompleteCount = Stats.CompleteCount;
- const uint64_t UpdateCount = Stats.UpdateCount;
- const double CompleteRate = CompleteCount > 0 ? (double(PostCount) / double(CompleteCount)) : 0.0;
+ // const uint64_t UpdateCount = Stats.UpdateCount;
+ const double CompleteRate = CompleteCount > 0 ? (double(PostCount) / double(CompleteCount)) : 0.0;
Status << "post_count" << PostCount;
Status << "complete_count" << PostCount;
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index e4ccaadbe..616cd4146 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -40,13 +40,15 @@ namespace detail {
: m_Log(zen::logging::Get("upstream"))
, m_UseLegacyDdc(Options.UseLegacyDdc)
{
- using namespace fmt::literals;
- m_DisplayName = "Jupiter - '{}'"_format(Options.ServiceUrl);
- m_Client = new CloudCacheClient(Options);
+ m_Info.Name = "Horde"sv;
+ m_Info.Url = Options.ServiceUrl;
+ m_Client = new CloudCacheClient(Options);
}
virtual ~JupiterUpstreamEndpoint() = default;
+ virtual const UpstreamEndpointInfo& GetEndpointInfo() const { return m_Info; }
+
virtual UpstreamEndpointHealth Initialize() override { return CheckHealth(); }
virtual bool IsHealthy() const override { return m_HealthOk.load(); }
@@ -68,8 +70,6 @@ namespace detail {
}
}
- virtual std::string_view DisplayName() const override { return m_DisplayName; }
-
virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override
{
try
@@ -286,7 +286,12 @@ namespace detail {
}
}
- return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success};
+ m_HealthOk = Result.ErrorCode == 0;
+
+ return {.Reason = std::move(Result.Reason),
+ .Bytes = Result.Bytes,
+ .ElapsedSeconds = Result.ElapsedSeconds,
+ .Success = Result.Success};
}
else
{
@@ -312,6 +317,8 @@ namespace detail {
BlobResult = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]);
}
+ m_HealthOk = BlobResult.ErrorCode == 0;
+
if (!BlobResult.Success)
{
OutReason = "upload payload '{}' FAILED, reason '{}'"_format(PayloadId, BlobResult.Reason);
@@ -332,6 +339,8 @@ namespace detail {
Session.PutRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue, ZenContentType::kCbObject);
}
+ m_HealthOk = RefResult.ErrorCode == 0;
+
if (!RefResult.Success)
{
return {.Reason = "upload cache record '{}/{}' FAILED, reason '{}'"_format(CacheRecord.Key.Bucket,
@@ -351,6 +360,7 @@ namespace detail {
const IoHash RefHash = IoHash::HashBuffer(RecordValue);
FinalizeRefResult FinalizeResult = Session.FinalizeRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RefHash);
+ m_HealthOk = FinalizeResult.ErrorCode == 0;
if (!FinalizeResult.Success)
{
@@ -368,6 +378,7 @@ namespace detail {
}
FinalizeResult = Session.FinalizeRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RefHash);
+ m_HealthOk = FinalizeResult.ErrorCode == 0;
if (!FinalizeResult.Success)
{
@@ -400,6 +411,7 @@ namespace detail {
}
catch (std::exception& Err)
{
+ m_HealthOk = false;
return {.Reason = std::string(Err.what()), .Success = false};
}
}
@@ -422,6 +434,7 @@ namespace detail {
spdlog::logger& Log() { return m_Log; }
spdlog::logger& m_Log;
+ UpstreamEndpointInfo m_Info;
bool m_UseLegacyDdc;
std::string m_DisplayName;
RefPtr<CloudCacheClient> m_Client;
@@ -444,7 +457,7 @@ namespace detail {
public:
ZenUpstreamEndpoint(const ZenStructuredCacheClientOptions& Options)
: m_Log(zen::logging::Get("upstream"))
- , m_DisplayName("ZEN")
+ , m_Info({.Name = std::string("Zen")})
, m_ConnectTimeout(Options.ConnectTimeout)
, m_Timeout(Options.Timeout)
{
@@ -456,6 +469,8 @@ namespace detail {
~ZenUpstreamEndpoint() = default;
+ virtual const UpstreamEndpointInfo& GetEndpointInfo() const { return m_Info; }
+
virtual UpstreamEndpointHealth Initialize() override
{
using namespace fmt::literals;
@@ -463,9 +478,8 @@ namespace detail {
const ZenEndpoint& Ep = GetEndpoint();
if (Ep.Ok)
{
- m_ServiceUrl = Ep.Url;
- m_DisplayName = "ZEN - {}"_format(m_ServiceUrl);
- m_Client = new ZenStructuredCacheClient({.Url = m_ServiceUrl, .ConnectTimeout = m_ConnectTimeout, .Timeout = m_Timeout});
+ m_Info.Url = Ep.Url;
+ m_Client = new ZenStructuredCacheClient({.Url = m_Info.Url, .ConnectTimeout = m_ConnectTimeout, .Timeout = m_Timeout});
m_HealthOk = true;
return {.Ok = true};
@@ -488,10 +502,9 @@ namespace detail {
const ZenEndpoint& Ep = GetEndpoint();
if (Ep.Ok)
{
- m_ServiceUrl = Ep.Url;
- m_DisplayName = "ZEN - {}"_format(m_ServiceUrl);
+ m_Info.Url = Ep.Url;
m_Client =
- new ZenStructuredCacheClient({.Url = m_ServiceUrl, .ConnectTimeout = m_ConnectTimeout, .Timeout = m_Timeout});
+ new ZenStructuredCacheClient({.Url = m_Info.Url, .ConnectTimeout = m_ConnectTimeout, .Timeout = m_Timeout});
m_HealthOk = true;
return {.Ok = true};
@@ -518,8 +531,6 @@ namespace detail {
}
}
- virtual std::string_view DisplayName() const override { return m_DisplayName; }
-
virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override
{
try
@@ -819,10 +830,10 @@ namespace detail {
.ElapsedSeconds = TotalElapsedSeconds,
.Success = Result.Success};
}
- catch (std::exception& e)
+ catch (std::exception& Err)
{
m_HealthOk = false;
- return {.Reason = std::string(e.what()), .Success = false};
+ return {.Reason = std::string(Err.what()), .Success = false};
}
}
@@ -854,7 +865,7 @@ namespace detail {
for (const auto& Ep : m_Endpoints)
{
- ZEN_INFO("ping ZEN endpoint '{}' latency '{:.3}s' {}", Ep.Url, Ep.Latency, Ep.Ok ? "OK" : Ep.Reason);
+ ZEN_INFO("ping 'Zen' endpoint '{}' latency '{:.3}s' {}", Ep.Url, Ep.Latency, Ep.Ok ? "OK" : Ep.Reason);
}
return m_Endpoints.front();
@@ -863,9 +874,8 @@ namespace detail {
spdlog::logger& Log() { return m_Log; }
spdlog::logger& m_Log;
- std::string m_ServiceUrl;
+ UpstreamEndpointInfo m_Info;
std::vector<ZenEndpoint> m_Endpoints;
- std::string m_DisplayName;
std::chrono::milliseconds m_ConnectTimeout;
std::chrono::milliseconds m_Timeout;
RefPtr<ZenStructuredCacheClient> m_Client;
@@ -954,7 +964,7 @@ struct UpstreamStats
const double HitRate = TotalCount > 0 ? (double(HitCount) / double(TotalCount)) : 0.0;
Logger.debug("STATS - '{}', Hit rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'",
- Ep->DisplayName(),
+ Ep->GetEndpointInfo().Name,
HitRate,
DownBytes,
DownSpeed,
@@ -988,13 +998,15 @@ public:
for (auto& Endpoint : m_Endpoints)
{
const UpstreamEndpointHealth Health = Endpoint->Initialize();
+ const UpstreamEndpointInfo& Info = Endpoint->GetEndpointInfo();
+
if (Health.Ok)
{
- ZEN_INFO("initialize endpoint '{}' OK", Endpoint->DisplayName());
+ ZEN_INFO("'{}' endpoint '{}' OK", Info.Name, Info.Url);
}
else
{
- ZEN_WARN("initialize endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason);
+ ZEN_WARN("'{}' endpoint '{}' FAILED, reason '{}'", Info.Name, Info.Url, Health.Reason);
}
}
@@ -1034,7 +1046,7 @@ public:
if (Result.Error)
{
ZEN_ERROR("get cache record FAILED, endpoint '{}', reason '{}', error code '{}'",
- Endpoint->DisplayName(),
+ Endpoint->GetEndpointInfo().Url,
Result.Error.Reason,
Result.Error.ErrorCode);
}
@@ -1074,7 +1086,7 @@ public:
if (Result.Error)
{
ZEN_ERROR("get cache record(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'",
- Endpoint->DisplayName(),
+ Endpoint->GetEndpointInfo().Url,
Result.Error.Reason,
Result.Error.ErrorCode);
}
@@ -1120,7 +1132,7 @@ public:
if (Result.Error)
{
ZEN_ERROR("get cache payloads(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'",
- Endpoint->DisplayName(),
+ Endpoint->GetEndpointInfo().Url,
Result.Error.Reason,
Result.Error.ErrorCode);
}
@@ -1156,7 +1168,7 @@ public:
if (Result.Error)
{
ZEN_ERROR("get cache payload FAILED, endpoint '{}', reason '{}', error code '{}'",
- Endpoint->DisplayName(),
+ Endpoint->GetEndpointInfo().Url,
Result.Error.Reason,
Result.Error.ErrorCode);
}
@@ -1196,8 +1208,10 @@ public:
Status.BeginArray("endpoints");
for (const auto& Ep : m_Endpoints)
{
+ const UpstreamEndpointInfo& Info = Ep->GetEndpointInfo();
Status.BeginObject();
- Status << "name" << Ep->DisplayName();
+ Status << "name" << Info.Name;
+ Status << "url" << Info.Url;
Status << "health" << (Ep->IsHealthy() ? "ok"sv : "inactive"sv);
UpstreamEndpointStats& Stats = Ep->Stats();
@@ -1258,7 +1272,7 @@ private:
ZEN_WARN("upload cache record '{}/{}' FAILED, endpoint '{}', reason '{}'",
CacheRecord.Key.Bucket,
CacheRecord.Key.Hash,
- Endpoint->DisplayName(),
+ Endpoint->GetEndpointInfo().Url,
Result.Reason);
}
}
@@ -1276,12 +1290,12 @@ private:
{
ProcessCacheRecord(std::move(CacheRecord));
}
- catch (std::exception& e)
+ catch (std::exception& Err)
{
ZEN_ERROR("upload cache record '{}/{}' FAILED, reason '{}'",
CacheRecord.Key.Bucket,
CacheRecord.Key.Hash,
- e.what());
+ Err.what());
}
}
@@ -1310,20 +1324,21 @@ private:
{
if (!Endpoint->IsHealthy())
{
+ const UpstreamEndpointInfo& Info = Endpoint->GetEndpointInfo();
if (const UpstreamEndpointHealth Health = Endpoint->CheckHealth(); Health.Ok)
{
- ZEN_INFO("health check endpoint '{}' OK", Endpoint->DisplayName(), Health.Reason);
+ ZEN_INFO("health check endpoint '{} - {}' OK", Info.Name, Info.Url, Health.Reason);
}
else
{
- ZEN_WARN("health check endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason);
+ ZEN_WARN("health check endpoint '{} - {}' FAILED, reason '{}'", Info.Name, Info.Url, Health.Reason);
}
}
}
}
- catch (std::exception& e)
+ catch (std::exception& Err)
{
- ZEN_ERROR("check endpoint(s) health FAILED, reason '{}'", e.what());
+ ZEN_ERROR("check endpoint(s) health FAILED, reason '{}'", Err.what());
}
}
}
diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h
index 520f5b99d..c463c4996 100644
--- a/zenserver/upstream/upstreamcache.h
+++ b/zenserver/upstream/upstreamcache.h
@@ -100,6 +100,12 @@ struct CachePayloadGetCompleteParams
using OnCachePayloadGetComplete = std::function<void(CachePayloadGetCompleteParams&&)>;
+struct UpstreamEndpointInfo
+{
+ std::string Name;
+ std::string Url;
+};
+
/**
* The upstream endpont is responsible for handling upload/downloading of cache records.
*/
@@ -108,14 +114,14 @@ class UpstreamEndpoint
public:
virtual ~UpstreamEndpoint() = default;
+ virtual const UpstreamEndpointInfo& GetEndpointInfo() const = 0;
+
virtual UpstreamEndpointHealth Initialize() = 0;
virtual bool IsHealthy() const = 0;
virtual UpstreamEndpointHealth CheckHealth() = 0;
- virtual std::string_view DisplayName() const = 0;
-
virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) = 0;
virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKey> CacheKeys,