diff options
| author | Joe Kirchoff <[email protected]> | 2022-03-22 11:47:38 -0700 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-03-22 11:47:38 -0700 |
| commit | cc5adf4cb79c92993fabfe09e75dfadb7d4c9665 (patch) | |
| tree | 4ba0a18f68e39685fa784d872bbb4bb9ba2b6fd7 /zenserver/upstream/upstreamapply.cpp | |
| parent | move workthreadpool to zencore (#63) (diff) | |
| download | zen-cc5adf4cb79c92993fabfe09e75dfadb7d4c9665.tar.xz zen-cc5adf4cb79c92993fabfe09e75dfadb7d4c9665.zip | |
Enable Horde compute code on Linux & Mac (#61)
Diffstat (limited to 'zenserver/upstream/upstreamapply.cpp')
| -rw-r--r-- | zenserver/upstream/upstreamapply.cpp | 107 |
1 files changed, 26 insertions, 81 deletions
diff --git a/zenserver/upstream/upstreamapply.cpp b/zenserver/upstream/upstreamapply.cpp index 918697224..fd304adb8 100644 --- a/zenserver/upstream/upstreamapply.cpp +++ b/zenserver/upstream/upstreamapply.cpp @@ -59,9 +59,9 @@ namespace detail { CidStore& CidStore, AuthMgr& Mgr) : m_Log(logging::Get("upstream-apply")) - , m_AuthMgr(Mgr) , m_CasStore(CasStore) , m_CidStore(CidStore) + , m_AuthMgr(Mgr) { m_DisplayName = fmt::format("{} - '{}'+'{}'", ComputeOptions.Name, ComputeOptions.ServiceUrl, StorageOptions.ServiceUrl); m_ChannelId = fmt::format("zen-{}", zen::GetSessionIdString()); @@ -434,9 +434,6 @@ namespace detail { CbObjectView Status = It.AsObjectView(); const ComputeTaskState State = (ComputeTaskState)Status["s"sv].AsInt32(); - const std::string_view AgentId = TaskStatus["a"sv].AsString(); - const std::string_view LeaseId = TaskStatus["l"sv].AsString(); - // Only care about completed tasks if (State != ComputeTaskState::Complete) { @@ -486,10 +483,10 @@ namespace detail { private: spdlog::logger& Log() { return m_Log; } + spdlog::logger& m_Log; CasStore& m_CasStore; CidStore& m_CidStore; AuthMgr& m_AuthMgr; - spdlog::logger& m_Log; std::string m_DisplayName; RefPtr<CloudCacheClient> m_Client; RefPtr<CloudCacheClient> m_StorageClient; @@ -532,11 +529,7 @@ namespace detail { return {.Error{.ErrorCode = -1, .Reason = fmt::format("Task {}", ComputeTaskOutcomeToString(Outcome))}}; } - const IoHash TaskId = TaskStatus["h"sv].AsHash(); - const DateTime Time = TaskStatus["t"sv].AsDateTime(); - const IoHash ResultHash = TaskStatus["r"sv].AsHash(); - const std::string_view AgentId = TaskStatus["a"sv].AsString(); - const std::string_view LeaseId = TaskStatus["l"sv].AsString(); + const IoHash ResultHash = TaskStatus["r"sv].AsHash(); int64_t Bytes{}; double ElapsedSeconds{}; @@ -828,7 +821,7 @@ namespace detail { { std::string_view Env = It.AsString(); auto Index = Env.find('='); - if (Index < 0) + if (Index == std::string_view::npos) { Log().warn("process apply upstream FAILED, environment '{}' malformed", Env); return false; @@ -910,7 +903,7 @@ namespace detail { } if (Memory > 0) { - Resources["RAM"sv] = std::max(Memory / 1024 / 1024 / 1024, 1LL); + Resources["RAM"sv] = std::max(Memory / 1024LL / 1024LL / 1024LL, 1LL); } CbObject Requirements = BuildRequirements(Condition, Resources, Exclusive); @@ -1063,7 +1056,7 @@ namespace detail { DirectoryTreeWriter.EndArray(); } - return std::move(DirectoryTreeWriter.Save()); + return DirectoryTreeWriter.Save(); } [[nodiscard]] CbObject BuildRequirements(const std::string_view Condition, @@ -1085,7 +1078,7 @@ namespace detail { Writer.EndArray(); } Writer.AddBool("e", Exclusive); - return std::move(Writer.Save()); + return Writer.Save(); } [[nodiscard]] CbObject BuildTask(const std::string_view Executable, @@ -1141,7 +1134,7 @@ namespace detail { TaskWriter.EndArray(); } - return std::move(TaskWriter.Save()); + return TaskWriter.Save(); } }; } // namespace detail @@ -1154,46 +1147,33 @@ struct UpstreamApplyStats UpstreamApplyStats(bool Enabled) : m_Enabled(Enabled) {} - void Add(spdlog::logger& Logger, - UpstreamApplyEndpoint& Endpoint, - const PostUpstreamApplyResult& Result, - const std::vector<std::unique_ptr<UpstreamApplyEndpoint>>& Endpoints) + void Add(UpstreamApplyEndpoint& Endpoint, const PostUpstreamApplyResult& Result) { UpstreamApplyEndpointStats& Stats = Endpoint.Stats(); if (Result.Error) { - Stats.ErrorCount++; + Stats.ErrorCount.Increment(1); } else if (Result.Success) { - Stats.PostCount++; - Stats.UpBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0); - Stats.SecondsUp.fetch_add(Result.ElapsedSeconds); - } - - if (m_Enabled && m_SampleCount++ % MaxSampleCount) - { - Dump(Logger, Endpoints); + Stats.PostCount.Increment(1); + Stats.UpBytes.Increment(Result.Bytes / 1024 / 1024); } } - void Add(spdlog::logger& Logger, - UpstreamApplyEndpoint& Endpoint, - const GetUpstreamApplyUpdatesResult& Result, - const std::vector<std::unique_ptr<UpstreamApplyEndpoint>>& Endpoints) + void Add(UpstreamApplyEndpoint& Endpoint, const GetUpstreamApplyUpdatesResult& Result) { UpstreamApplyEndpointStats& Stats = Endpoint.Stats(); if (Result.Error) { - Stats.ErrorCount++; + Stats.ErrorCount.Increment(1); } else if (Result.Success) { - Stats.UpdateCount++; - Stats.DownBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0); - Stats.SecondsDown.fetch_add(Result.ElapsedSeconds); + Stats.UpdateCount.Increment(1); + Stats.DownBytes.Increment(Result.Bytes / 1024 / 1024); if (!Result.Completed.empty()) { uint64_t Completed = 0; @@ -1201,47 +1181,12 @@ struct UpstreamApplyStats { Completed += It.second.size(); } - Stats.CompleteCount.fetch_add(Completed); + Stats.CompleteCount.Increment(Completed); } } - - if (m_Enabled && m_SampleCount++ % MaxSampleCount) - { - Dump(Logger, Endpoints); - } - } - - void Dump(spdlog::logger& Logger, const std::vector<std::unique_ptr<UpstreamApplyEndpoint>>& Endpoints) - { - for (auto& Ep : Endpoints) - { - // These stats will not be totally correct as the numbers are not captured atomically - - UpstreamApplyEndpointStats& Stats = Ep->Stats(); - const uint64_t PostCount = Stats.PostCount; - const uint64_t CompleteCount = Stats.CompleteCount; - // const uint64_t UpdateCount = Stats.UpdateCount; - const double DownBytes = Stats.DownBytes; - const double SecondsDown = Stats.SecondsDown; - const double UpBytes = Stats.UpBytes; - const double SecondsUp = Stats.SecondsUp; - - const double UpSpeed = UpBytes > 0 ? UpBytes / SecondsUp : 0.0; - const double DownSpeed = DownBytes > 0 ? DownBytes / SecondsDown : 0.0; - const double CompleteRate = CompleteCount > 0 ? (double(PostCount) / double(CompleteCount)) : 0.0; - - Logger.debug("STATS - '{}', Complete rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'", - Ep->DisplayName(), - CompleteRate, - DownBytes, - DownSpeed, - UpBytes, - UpSpeed); - } } - bool m_Enabled; - std::atomic_uint64_t m_SampleCount = {}; + bool m_Enabled; }; ////////////////////////////////////////////////////////////////////////// @@ -1364,19 +1309,19 @@ public: Status << "health" << (Ep->IsHealthy() ? "ok"sv : "inactive"sv); UpstreamApplyEndpointStats& Stats = Ep->Stats(); - const uint64_t PostCount = Stats.PostCount; - const uint64_t CompleteCount = Stats.CompleteCount; + const uint64_t PostCount = Stats.PostCount.Value(); + const uint64_t CompleteCount = Stats.CompleteCount.Value(); // const uint64_t UpdateCount = Stats.UpdateCount; const double CompleteRate = CompleteCount > 0 ? (double(PostCount) / double(CompleteCount)) : 0.0; Status << "post_count" << PostCount; Status << "complete_count" << PostCount; - Status << "update_count" << Stats.UpdateCount; + Status << "update_count" << Stats.UpdateCount.Value(); Status << "complete_ratio" << CompleteRate; - Status << "downloaded_mb" << Stats.DownBytes; - Status << "uploaded_mb" << Stats.UpBytes; - Status << "error_count" << Stats.ErrorCount; + Status << "downloaded_mb" << Stats.DownBytes.Value(); + Status << "uploaded_mb" << Stats.UpBytes.Value(); + Status << "error_count" << Stats.ErrorCount.Value(); Status.EndObject(); } @@ -1425,7 +1370,7 @@ private: } } } - m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); + m_Stats.Add(*Endpoint, Result); return; } } @@ -1476,7 +1421,7 @@ private: if (Endpoint->IsHealthy()) { GetUpstreamApplyUpdatesResult Result = Endpoint->GetUpdates(); - m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); + m_Stats.Add(*Endpoint, Result); if (!Result.Success) { |