aboutsummaryrefslogtreecommitdiff
path: root/zenserver/upstream/upstreamapply.cpp
diff options
context:
space:
mode:
authorJoe Kirchoff <[email protected]>2022-03-22 11:47:38 -0700
committerGitHub <[email protected]>2022-03-22 11:47:38 -0700
commitcc5adf4cb79c92993fabfe09e75dfadb7d4c9665 (patch)
tree4ba0a18f68e39685fa784d872bbb4bb9ba2b6fd7 /zenserver/upstream/upstreamapply.cpp
parentmove workthreadpool to zencore (#63) (diff)
downloadzen-cc5adf4cb79c92993fabfe09e75dfadb7d4c9665.tar.xz
zen-cc5adf4cb79c92993fabfe09e75dfadb7d4c9665.zip
Enable Horde compute code on Linux & Mac (#61)
Diffstat (limited to 'zenserver/upstream/upstreamapply.cpp')
-rw-r--r--zenserver/upstream/upstreamapply.cpp107
1 files changed, 26 insertions, 81 deletions
diff --git a/zenserver/upstream/upstreamapply.cpp b/zenserver/upstream/upstreamapply.cpp
index 918697224..fd304adb8 100644
--- a/zenserver/upstream/upstreamapply.cpp
+++ b/zenserver/upstream/upstreamapply.cpp
@@ -59,9 +59,9 @@ namespace detail {
CidStore& CidStore,
AuthMgr& Mgr)
: m_Log(logging::Get("upstream-apply"))
- , m_AuthMgr(Mgr)
, m_CasStore(CasStore)
, m_CidStore(CidStore)
+ , m_AuthMgr(Mgr)
{
m_DisplayName = fmt::format("{} - '{}'+'{}'", ComputeOptions.Name, ComputeOptions.ServiceUrl, StorageOptions.ServiceUrl);
m_ChannelId = fmt::format("zen-{}", zen::GetSessionIdString());
@@ -434,9 +434,6 @@ namespace detail {
CbObjectView Status = It.AsObjectView();
const ComputeTaskState State = (ComputeTaskState)Status["s"sv].AsInt32();
- const std::string_view AgentId = TaskStatus["a"sv].AsString();
- const std::string_view LeaseId = TaskStatus["l"sv].AsString();
-
// Only care about completed tasks
if (State != ComputeTaskState::Complete)
{
@@ -486,10 +483,10 @@ namespace detail {
private:
spdlog::logger& Log() { return m_Log; }
+ spdlog::logger& m_Log;
CasStore& m_CasStore;
CidStore& m_CidStore;
AuthMgr& m_AuthMgr;
- spdlog::logger& m_Log;
std::string m_DisplayName;
RefPtr<CloudCacheClient> m_Client;
RefPtr<CloudCacheClient> m_StorageClient;
@@ -532,11 +529,7 @@ namespace detail {
return {.Error{.ErrorCode = -1, .Reason = fmt::format("Task {}", ComputeTaskOutcomeToString(Outcome))}};
}
- const IoHash TaskId = TaskStatus["h"sv].AsHash();
- const DateTime Time = TaskStatus["t"sv].AsDateTime();
- const IoHash ResultHash = TaskStatus["r"sv].AsHash();
- const std::string_view AgentId = TaskStatus["a"sv].AsString();
- const std::string_view LeaseId = TaskStatus["l"sv].AsString();
+ const IoHash ResultHash = TaskStatus["r"sv].AsHash();
int64_t Bytes{};
double ElapsedSeconds{};
@@ -828,7 +821,7 @@ namespace detail {
{
std::string_view Env = It.AsString();
auto Index = Env.find('=');
- if (Index < 0)
+ if (Index == std::string_view::npos)
{
Log().warn("process apply upstream FAILED, environment '{}' malformed", Env);
return false;
@@ -910,7 +903,7 @@ namespace detail {
}
if (Memory > 0)
{
- Resources["RAM"sv] = std::max(Memory / 1024 / 1024 / 1024, 1LL);
+ Resources["RAM"sv] = std::max(Memory / 1024LL / 1024LL / 1024LL, 1LL);
}
CbObject Requirements = BuildRequirements(Condition, Resources, Exclusive);
@@ -1063,7 +1056,7 @@ namespace detail {
DirectoryTreeWriter.EndArray();
}
- return std::move(DirectoryTreeWriter.Save());
+ return DirectoryTreeWriter.Save();
}
[[nodiscard]] CbObject BuildRequirements(const std::string_view Condition,
@@ -1085,7 +1078,7 @@ namespace detail {
Writer.EndArray();
}
Writer.AddBool("e", Exclusive);
- return std::move(Writer.Save());
+ return Writer.Save();
}
[[nodiscard]] CbObject BuildTask(const std::string_view Executable,
@@ -1141,7 +1134,7 @@ namespace detail {
TaskWriter.EndArray();
}
- return std::move(TaskWriter.Save());
+ return TaskWriter.Save();
}
};
} // namespace detail
@@ -1154,46 +1147,33 @@ struct UpstreamApplyStats
UpstreamApplyStats(bool Enabled) : m_Enabled(Enabled) {}
- void Add(spdlog::logger& Logger,
- UpstreamApplyEndpoint& Endpoint,
- const PostUpstreamApplyResult& Result,
- const std::vector<std::unique_ptr<UpstreamApplyEndpoint>>& Endpoints)
+ void Add(UpstreamApplyEndpoint& Endpoint, const PostUpstreamApplyResult& Result)
{
UpstreamApplyEndpointStats& Stats = Endpoint.Stats();
if (Result.Error)
{
- Stats.ErrorCount++;
+ Stats.ErrorCount.Increment(1);
}
else if (Result.Success)
{
- Stats.PostCount++;
- Stats.UpBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0);
- Stats.SecondsUp.fetch_add(Result.ElapsedSeconds);
- }
-
- if (m_Enabled && m_SampleCount++ % MaxSampleCount)
- {
- Dump(Logger, Endpoints);
+ Stats.PostCount.Increment(1);
+ Stats.UpBytes.Increment(Result.Bytes / 1024 / 1024);
}
}
- void Add(spdlog::logger& Logger,
- UpstreamApplyEndpoint& Endpoint,
- const GetUpstreamApplyUpdatesResult& Result,
- const std::vector<std::unique_ptr<UpstreamApplyEndpoint>>& Endpoints)
+ void Add(UpstreamApplyEndpoint& Endpoint, const GetUpstreamApplyUpdatesResult& Result)
{
UpstreamApplyEndpointStats& Stats = Endpoint.Stats();
if (Result.Error)
{
- Stats.ErrorCount++;
+ Stats.ErrorCount.Increment(1);
}
else if (Result.Success)
{
- Stats.UpdateCount++;
- Stats.DownBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0);
- Stats.SecondsDown.fetch_add(Result.ElapsedSeconds);
+ Stats.UpdateCount.Increment(1);
+ Stats.DownBytes.Increment(Result.Bytes / 1024 / 1024);
if (!Result.Completed.empty())
{
uint64_t Completed = 0;
@@ -1201,47 +1181,12 @@ struct UpstreamApplyStats
{
Completed += It.second.size();
}
- Stats.CompleteCount.fetch_add(Completed);
+ Stats.CompleteCount.Increment(Completed);
}
}
-
- if (m_Enabled && m_SampleCount++ % MaxSampleCount)
- {
- Dump(Logger, Endpoints);
- }
- }
-
- void Dump(spdlog::logger& Logger, const std::vector<std::unique_ptr<UpstreamApplyEndpoint>>& Endpoints)
- {
- for (auto& Ep : Endpoints)
- {
- // These stats will not be totally correct as the numbers are not captured atomically
-
- UpstreamApplyEndpointStats& Stats = Ep->Stats();
- const uint64_t PostCount = Stats.PostCount;
- const uint64_t CompleteCount = Stats.CompleteCount;
- // const uint64_t UpdateCount = Stats.UpdateCount;
- const double DownBytes = Stats.DownBytes;
- const double SecondsDown = Stats.SecondsDown;
- const double UpBytes = Stats.UpBytes;
- const double SecondsUp = Stats.SecondsUp;
-
- const double UpSpeed = UpBytes > 0 ? UpBytes / SecondsUp : 0.0;
- const double DownSpeed = DownBytes > 0 ? DownBytes / SecondsDown : 0.0;
- const double CompleteRate = CompleteCount > 0 ? (double(PostCount) / double(CompleteCount)) : 0.0;
-
- Logger.debug("STATS - '{}', Complete rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'",
- Ep->DisplayName(),
- CompleteRate,
- DownBytes,
- DownSpeed,
- UpBytes,
- UpSpeed);
- }
}
- bool m_Enabled;
- std::atomic_uint64_t m_SampleCount = {};
+ bool m_Enabled;
};
//////////////////////////////////////////////////////////////////////////
@@ -1364,19 +1309,19 @@ public:
Status << "health" << (Ep->IsHealthy() ? "ok"sv : "inactive"sv);
UpstreamApplyEndpointStats& Stats = Ep->Stats();
- const uint64_t PostCount = Stats.PostCount;
- const uint64_t CompleteCount = Stats.CompleteCount;
+ const uint64_t PostCount = Stats.PostCount.Value();
+ const uint64_t CompleteCount = Stats.CompleteCount.Value();
// const uint64_t UpdateCount = Stats.UpdateCount;
const double CompleteRate = CompleteCount > 0 ? (double(PostCount) / double(CompleteCount)) : 0.0;
Status << "post_count" << PostCount;
Status << "complete_count" << PostCount;
- Status << "update_count" << Stats.UpdateCount;
+ Status << "update_count" << Stats.UpdateCount.Value();
Status << "complete_ratio" << CompleteRate;
- Status << "downloaded_mb" << Stats.DownBytes;
- Status << "uploaded_mb" << Stats.UpBytes;
- Status << "error_count" << Stats.ErrorCount;
+ Status << "downloaded_mb" << Stats.DownBytes.Value();
+ Status << "uploaded_mb" << Stats.UpBytes.Value();
+ Status << "error_count" << Stats.ErrorCount.Value();
Status.EndObject();
}
@@ -1425,7 +1370,7 @@ private:
}
}
}
- m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints);
+ m_Stats.Add(*Endpoint, Result);
return;
}
}
@@ -1476,7 +1421,7 @@ private:
if (Endpoint->IsHealthy())
{
GetUpstreamApplyUpdatesResult Result = Endpoint->GetUpdates();
- m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints);
+ m_Stats.Add(*Endpoint, Result);
if (!Result.Success)
{