diff options
| author | Stefan Boberg <[email protected]> | 2026-04-02 22:52:53 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2026-04-02 22:52:53 +0200 |
| commit | 1f766abf8fce4bf24d600bc5cb3c3c396f8a29ca (patch) | |
| tree | b3b60be4789caaed07903c07ca7a5107fe49029f /src/zencompute | |
| parent | Add MemoryCidStore: memory-backed chunk store with async write-through (diff) | |
| parent | Add hostname and payload stats to remote submit warnings (diff) | |
| download | zen-sb/compute-oidc-auth.tar.xz zen-sb/compute-oidc-auth.zip | |
Merge branch 'sb/compute-oidc-auth' of https://github.ol.epicgames.net/ue-foundation/zen into sb/compute-oidc-authsb/compute-oidc-auth
Diffstat (limited to 'src/zencompute')
| -rw-r--r-- | src/zencompute/computeservice.cpp | 1 | ||||
| -rw-r--r-- | src/zencompute/runners/functionrunner.cpp | 16 | ||||
| -rw-r--r-- | src/zencompute/runners/functionrunner.h | 15 | ||||
| -rw-r--r-- | src/zencompute/runners/remotehttprunner.cpp | 18 | ||||
| -rw-r--r-- | src/zencompute/runners/remotehttprunner.h | 4 |
5 files changed, 50 insertions, 4 deletions
diff --git a/src/zencompute/computeservice.cpp b/src/zencompute/computeservice.cpp index 857c2c567..16b95cb4e 100644 --- a/src/zencompute/computeservice.cpp +++ b/src/zencompute/computeservice.cpp @@ -617,6 +617,7 @@ ComputeServiceSession::Impl::UpdateCoordinatorState() m_KnownWorkerUris.insert(UriStr); auto* NewRunner = new RemoteHttpRunner(m_ChunkResolver, m_OrchestratorBasePath, UriStr, m_RemoteSubmitPool); + NewRunner->SetRemoteHostname(Hostname); SyncWorkersToRunner(*NewRunner); m_RemoteRunnerGroup.AddRunner(NewRunner); } diff --git a/src/zencompute/runners/functionrunner.cpp b/src/zencompute/runners/functionrunner.cpp index f3d5acca5..ab22c6363 100644 --- a/src/zencompute/runners/functionrunner.cpp +++ b/src/zencompute/runners/functionrunner.cpp @@ -6,7 +6,9 @@ # include <zencore/compactbinary.h> # include <zencore/filesystem.h> +# include <zencore/fmtutils.h> # include <zencore/logging.h> +# include <zencore/string.h> # include <zencore/timer.h> # include <zencore/trace.h> # include <zencore/workthreadpool.h> @@ -206,16 +208,24 @@ BaseRunnerGroup::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) static constexpr uint64_t SubmitWarnThresholdMs = 500; auto SubmitToRunner = [&](int RunnerIndex) { + auto& Runner = Runners[RunnerIndex]; + Runner->m_LastSubmitStats.Reset(); + Stopwatch Timer; - PerRunnerResults[RunnerIndex] = Runners[RunnerIndex]->SubmitActions(PerRunnerActions[RunnerIndex]); + PerRunnerResults[RunnerIndex] = Runner->SubmitActions(PerRunnerActions[RunnerIndex]); uint64_t ElapsedMs = Timer.GetElapsedTimeMs(); if (ElapsedMs >= SubmitWarnThresholdMs) { - ZEN_WARN("submit of {} actions to '{}' took {}ms", + size_t Attachments = Runner->m_LastSubmitStats.TotalAttachments.load(std::memory_order_relaxed); + uint64_t AttachmentBytes = Runner->m_LastSubmitStats.TotalAttachmentBytes.load(std::memory_order_relaxed); + + ZEN_WARN("submit of {} actions ({} attachments, {}) to '{}' took {}ms", PerRunnerActions[RunnerIndex].size(), - Runners[RunnerIndex]->GetDisplayName(), + Attachments, + NiceBytes(AttachmentBytes), + Runner->GetDisplayName(), ElapsedMs); } }; diff --git a/src/zencompute/runners/functionrunner.h b/src/zencompute/runners/functionrunner.h index a8234237c..449f0e228 100644 --- a/src/zencompute/runners/functionrunner.h +++ b/src/zencompute/runners/functionrunner.h @@ -43,6 +43,21 @@ public: [[nodiscard]] virtual std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions); [[nodiscard]] virtual std::string_view GetDisplayName() const { return "local"; } + // Accumulated stats from the most recent SubmitActions call. + // Reset before each call, populated by the runner implementation. + struct SubmitStats + { + std::atomic<size_t> TotalAttachments{0}; + std::atomic<uint64_t> TotalAttachmentBytes{0}; + + void Reset() + { + TotalAttachments.store(0, std::memory_order_relaxed); + TotalAttachmentBytes.store(0, std::memory_order_relaxed); + } + }; + SubmitStats m_LastSubmitStats; + // Best-effort cancellation of a specific in-flight action. Returns true if the // cancellation signal was successfully sent. The action will transition to Cancelled // asynchronously once the platform-level termination completes. diff --git a/src/zencompute/runners/remotehttprunner.cpp b/src/zencompute/runners/remotehttprunner.cpp index 1039969b4..55f78fdd6 100644 --- a/src/zencompute/runners/remotehttprunner.cpp +++ b/src/zencompute/runners/remotehttprunner.cpp @@ -39,6 +39,7 @@ RemoteHttpRunner::RemoteHttpRunner(ChunkResolver& InChunkResolver, , m_ChunkResolver{InChunkResolver} , m_WorkerPool{InWorkerPool} , m_HostName{HostName} +, m_DisplayName{HostName} , m_BaseUrl{fmt::format("{}/compute", HostName)} , m_Http(m_BaseUrl) , m_InstanceId(Oid::NewOid()) @@ -60,6 +61,15 @@ RemoteHttpRunner::RemoteHttpRunner(ChunkResolver& InChunkResolver, m_MonitorThread = std::thread{&RemoteHttpRunner::MonitorThreadFunction, this}; } +void +RemoteHttpRunner::SetRemoteHostname(std::string_view Hostname) +{ + if (!Hostname.empty()) + { + m_DisplayName = fmt::format("{} ({})", m_HostName, Hostname); + } +} + RemoteHttpRunner::~RemoteHttpRunner() { Shutdown(); @@ -394,6 +404,8 @@ RemoteHttpRunner::SubmitAction(Ref<RunnerAction> Action) CompressedBuffer::FromCompressed(SharedBuffer{Chunk}, /* out */ DataRawHash, /* out */ DataRawSize); Pkg.AddAttachment(CbAttachment(Compressed, AttachHash)); + m_LastSubmitStats.TotalAttachments.fetch_add(1, std::memory_order_relaxed); + m_LastSubmitStats.TotalAttachmentBytes.fetch_add(Chunk.GetSize(), std::memory_order_relaxed); } }); @@ -460,6 +472,8 @@ RemoteHttpRunner::SubmitAction(Ref<RunnerAction> Action) ZEN_ASSERT(DataRawHash == NeedHash); Pkg.AddAttachment(CbAttachment(Compressed, NeedHash)); + m_LastSubmitStats.TotalAttachments.fetch_add(1, std::memory_order_relaxed); + m_LastSubmitStats.TotalAttachmentBytes.fetch_add(Chunk.GetSize(), std::memory_order_relaxed); } else { @@ -583,6 +597,8 @@ RemoteHttpRunner::SubmitActionBatch(const std::string& SubmitUrl, const std::vec CompressedBuffer::FromCompressed(SharedBuffer{Chunk}, /* out */ DataRawHash, /* out */ DataRawSize); Pkg.AddAttachment(CbAttachment(Compressed, AttachHash)); + m_LastSubmitStats.TotalAttachments.fetch_add(1, std::memory_order_relaxed); + m_LastSubmitStats.TotalAttachmentBytes.fetch_add(Chunk.GetSize(), std::memory_order_relaxed); } } @@ -623,6 +639,8 @@ RemoteHttpRunner::SubmitActionBatch(const std::string& SubmitUrl, const std::vec ZEN_ASSERT(DataRawHash == NeedHash); Pkg.AddAttachment(CbAttachment(Compressed, NeedHash)); + m_LastSubmitStats.TotalAttachments.fetch_add(1, std::memory_order_relaxed); + m_LastSubmitStats.TotalAttachmentBytes.fetch_add(Chunk.GetSize(), std::memory_order_relaxed); } else { diff --git a/src/zencompute/runners/remotehttprunner.h b/src/zencompute/runners/remotehttprunner.h index 2010c98e8..fdf113c77 100644 --- a/src/zencompute/runners/remotehttprunner.h +++ b/src/zencompute/runners/remotehttprunner.h @@ -54,9 +54,10 @@ public: [[nodiscard]] virtual size_t QueryCapacity() override; [[nodiscard]] virtual std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) override; virtual void CancelRemoteQueue(int QueueId) override; - [[nodiscard]] virtual std::string_view GetDisplayName() const override { return m_HostName; } + [[nodiscard]] virtual std::string_view GetDisplayName() const override { return m_DisplayName; } std::string_view GetHostName() const { return m_HostName; } + void SetRemoteHostname(std::string_view Hostname); protected: LoggerRef Log() { return m_Log; } @@ -66,6 +67,7 @@ private: ChunkResolver& m_ChunkResolver; WorkerThreadPool& m_WorkerPool; std::string m_HostName; + std::string m_DisplayName; std::string m_BaseUrl; HttpClient m_Http; |