From 4681b24d74063a5f49147515828ba909db853b00 Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Thu, 2 Apr 2026 22:40:33 +0200 Subject: Add hostname and payload stats to remote submit warnings - Include remote hostname alongside URI in display name for remote runners, set from orchestrator discovery data - Track attachment count and total attachment bytes during remote submissions and include them in the slow-submit warning --- src/zencompute/computeservice.cpp | 1 + src/zencompute/runners/functionrunner.cpp | 16 +++++++++++++--- src/zencompute/runners/functionrunner.h | 15 +++++++++++++++ src/zencompute/runners/remotehttprunner.cpp | 18 ++++++++++++++++++ src/zencompute/runners/remotehttprunner.h | 4 +++- 5 files changed, 50 insertions(+), 4 deletions(-) (limited to 'src/zencompute') diff --git a/src/zencompute/computeservice.cpp b/src/zencompute/computeservice.cpp index 857c2c567..16b95cb4e 100644 --- a/src/zencompute/computeservice.cpp +++ b/src/zencompute/computeservice.cpp @@ -617,6 +617,7 @@ ComputeServiceSession::Impl::UpdateCoordinatorState() m_KnownWorkerUris.insert(UriStr); auto* NewRunner = new RemoteHttpRunner(m_ChunkResolver, m_OrchestratorBasePath, UriStr, m_RemoteSubmitPool); + NewRunner->SetRemoteHostname(Hostname); SyncWorkersToRunner(*NewRunner); m_RemoteRunnerGroup.AddRunner(NewRunner); } diff --git a/src/zencompute/runners/functionrunner.cpp b/src/zencompute/runners/functionrunner.cpp index f3d5acca5..ab22c6363 100644 --- a/src/zencompute/runners/functionrunner.cpp +++ b/src/zencompute/runners/functionrunner.cpp @@ -6,7 +6,9 @@ # include # include +# include # include +# include # include # include # include @@ -206,16 +208,24 @@ BaseRunnerGroup::SubmitActions(const std::vector>& Actions) static constexpr uint64_t SubmitWarnThresholdMs = 500; auto SubmitToRunner = [&](int RunnerIndex) { + auto& Runner = Runners[RunnerIndex]; + Runner->m_LastSubmitStats.Reset(); + Stopwatch Timer; - PerRunnerResults[RunnerIndex] = Runners[RunnerIndex]->SubmitActions(PerRunnerActions[RunnerIndex]); + PerRunnerResults[RunnerIndex] = Runner->SubmitActions(PerRunnerActions[RunnerIndex]); uint64_t ElapsedMs = Timer.GetElapsedTimeMs(); if (ElapsedMs >= SubmitWarnThresholdMs) { - ZEN_WARN("submit of {} actions to '{}' took {}ms", + size_t Attachments = Runner->m_LastSubmitStats.TotalAttachments.load(std::memory_order_relaxed); + uint64_t AttachmentBytes = Runner->m_LastSubmitStats.TotalAttachmentBytes.load(std::memory_order_relaxed); + + ZEN_WARN("submit of {} actions ({} attachments, {}) to '{}' took {}ms", PerRunnerActions[RunnerIndex].size(), - Runners[RunnerIndex]->GetDisplayName(), + Attachments, + NiceBytes(AttachmentBytes), + Runner->GetDisplayName(), ElapsedMs); } }; diff --git a/src/zencompute/runners/functionrunner.h b/src/zencompute/runners/functionrunner.h index a8234237c..449f0e228 100644 --- a/src/zencompute/runners/functionrunner.h +++ b/src/zencompute/runners/functionrunner.h @@ -43,6 +43,21 @@ public: [[nodiscard]] virtual std::vector SubmitActions(const std::vector>& Actions); [[nodiscard]] virtual std::string_view GetDisplayName() const { return "local"; } + // Accumulated stats from the most recent SubmitActions call. + // Reset before each call, populated by the runner implementation. + struct SubmitStats + { + std::atomic TotalAttachments{0}; + std::atomic TotalAttachmentBytes{0}; + + void Reset() + { + TotalAttachments.store(0, std::memory_order_relaxed); + TotalAttachmentBytes.store(0, std::memory_order_relaxed); + } + }; + SubmitStats m_LastSubmitStats; + // Best-effort cancellation of a specific in-flight action. Returns true if the // cancellation signal was successfully sent. The action will transition to Cancelled // asynchronously once the platform-level termination completes. diff --git a/src/zencompute/runners/remotehttprunner.cpp b/src/zencompute/runners/remotehttprunner.cpp index 1039969b4..55f78fdd6 100644 --- a/src/zencompute/runners/remotehttprunner.cpp +++ b/src/zencompute/runners/remotehttprunner.cpp @@ -39,6 +39,7 @@ RemoteHttpRunner::RemoteHttpRunner(ChunkResolver& InChunkResolver, , m_ChunkResolver{InChunkResolver} , m_WorkerPool{InWorkerPool} , m_HostName{HostName} +, m_DisplayName{HostName} , m_BaseUrl{fmt::format("{}/compute", HostName)} , m_Http(m_BaseUrl) , m_InstanceId(Oid::NewOid()) @@ -60,6 +61,15 @@ RemoteHttpRunner::RemoteHttpRunner(ChunkResolver& InChunkResolver, m_MonitorThread = std::thread{&RemoteHttpRunner::MonitorThreadFunction, this}; } +void +RemoteHttpRunner::SetRemoteHostname(std::string_view Hostname) +{ + if (!Hostname.empty()) + { + m_DisplayName = fmt::format("{} ({})", m_HostName, Hostname); + } +} + RemoteHttpRunner::~RemoteHttpRunner() { Shutdown(); @@ -394,6 +404,8 @@ RemoteHttpRunner::SubmitAction(Ref Action) CompressedBuffer::FromCompressed(SharedBuffer{Chunk}, /* out */ DataRawHash, /* out */ DataRawSize); Pkg.AddAttachment(CbAttachment(Compressed, AttachHash)); + m_LastSubmitStats.TotalAttachments.fetch_add(1, std::memory_order_relaxed); + m_LastSubmitStats.TotalAttachmentBytes.fetch_add(Chunk.GetSize(), std::memory_order_relaxed); } }); @@ -460,6 +472,8 @@ RemoteHttpRunner::SubmitAction(Ref Action) ZEN_ASSERT(DataRawHash == NeedHash); Pkg.AddAttachment(CbAttachment(Compressed, NeedHash)); + m_LastSubmitStats.TotalAttachments.fetch_add(1, std::memory_order_relaxed); + m_LastSubmitStats.TotalAttachmentBytes.fetch_add(Chunk.GetSize(), std::memory_order_relaxed); } else { @@ -583,6 +597,8 @@ RemoteHttpRunner::SubmitActionBatch(const std::string& SubmitUrl, const std::vec CompressedBuffer::FromCompressed(SharedBuffer{Chunk}, /* out */ DataRawHash, /* out */ DataRawSize); Pkg.AddAttachment(CbAttachment(Compressed, AttachHash)); + m_LastSubmitStats.TotalAttachments.fetch_add(1, std::memory_order_relaxed); + m_LastSubmitStats.TotalAttachmentBytes.fetch_add(Chunk.GetSize(), std::memory_order_relaxed); } } @@ -623,6 +639,8 @@ RemoteHttpRunner::SubmitActionBatch(const std::string& SubmitUrl, const std::vec ZEN_ASSERT(DataRawHash == NeedHash); Pkg.AddAttachment(CbAttachment(Compressed, NeedHash)); + m_LastSubmitStats.TotalAttachments.fetch_add(1, std::memory_order_relaxed); + m_LastSubmitStats.TotalAttachmentBytes.fetch_add(Chunk.GetSize(), std::memory_order_relaxed); } else { diff --git a/src/zencompute/runners/remotehttprunner.h b/src/zencompute/runners/remotehttprunner.h index 2010c98e8..fdf113c77 100644 --- a/src/zencompute/runners/remotehttprunner.h +++ b/src/zencompute/runners/remotehttprunner.h @@ -54,9 +54,10 @@ public: [[nodiscard]] virtual size_t QueryCapacity() override; [[nodiscard]] virtual std::vector SubmitActions(const std::vector>& Actions) override; virtual void CancelRemoteQueue(int QueueId) override; - [[nodiscard]] virtual std::string_view GetDisplayName() const override { return m_HostName; } + [[nodiscard]] virtual std::string_view GetDisplayName() const override { return m_DisplayName; } std::string_view GetHostName() const { return m_HostName; } + void SetRemoteHostname(std::string_view Hostname); protected: LoggerRef Log() { return m_Log; } @@ -66,6 +67,7 @@ private: ChunkResolver& m_ChunkResolver; WorkerThreadPool& m_WorkerPool; std::string m_HostName; + std::string m_DisplayName; std::string m_BaseUrl; HttpClient m_Http; -- cgit v1.2.3