aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/runners
diff options
context:
space:
mode:
Diffstat (limited to 'src/zencompute/runners')
-rw-r--r--src/zencompute/runners/functionrunner.cpp16
-rw-r--r--src/zencompute/runners/functionrunner.h15
-rw-r--r--src/zencompute/runners/remotehttprunner.cpp18
-rw-r--r--src/zencompute/runners/remotehttprunner.h4
4 files changed, 49 insertions, 4 deletions
diff --git a/src/zencompute/runners/functionrunner.cpp b/src/zencompute/runners/functionrunner.cpp
index f3d5acca5..ab22c6363 100644
--- a/src/zencompute/runners/functionrunner.cpp
+++ b/src/zencompute/runners/functionrunner.cpp
@@ -6,7 +6,9 @@
# include <zencore/compactbinary.h>
# include <zencore/filesystem.h>
+# include <zencore/fmtutils.h>
# include <zencore/logging.h>
+# include <zencore/string.h>
# include <zencore/timer.h>
# include <zencore/trace.h>
# include <zencore/workthreadpool.h>
@@ -206,16 +208,24 @@ BaseRunnerGroup::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
static constexpr uint64_t SubmitWarnThresholdMs = 500;
auto SubmitToRunner = [&](int RunnerIndex) {
+ auto& Runner = Runners[RunnerIndex];
+ Runner->m_LastSubmitStats.Reset();
+
Stopwatch Timer;
- PerRunnerResults[RunnerIndex] = Runners[RunnerIndex]->SubmitActions(PerRunnerActions[RunnerIndex]);
+ PerRunnerResults[RunnerIndex] = Runner->SubmitActions(PerRunnerActions[RunnerIndex]);
uint64_t ElapsedMs = Timer.GetElapsedTimeMs();
if (ElapsedMs >= SubmitWarnThresholdMs)
{
- ZEN_WARN("submit of {} actions to '{}' took {}ms",
+ size_t Attachments = Runner->m_LastSubmitStats.TotalAttachments.load(std::memory_order_relaxed);
+ uint64_t AttachmentBytes = Runner->m_LastSubmitStats.TotalAttachmentBytes.load(std::memory_order_relaxed);
+
+ ZEN_WARN("submit of {} actions ({} attachments, {}) to '{}' took {}ms",
PerRunnerActions[RunnerIndex].size(),
- Runners[RunnerIndex]->GetDisplayName(),
+ Attachments,
+ NiceBytes(AttachmentBytes),
+ Runner->GetDisplayName(),
ElapsedMs);
}
};
diff --git a/src/zencompute/runners/functionrunner.h b/src/zencompute/runners/functionrunner.h
index a8234237c..449f0e228 100644
--- a/src/zencompute/runners/functionrunner.h
+++ b/src/zencompute/runners/functionrunner.h
@@ -43,6 +43,21 @@ public:
[[nodiscard]] virtual std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions);
[[nodiscard]] virtual std::string_view GetDisplayName() const { return "local"; }
+ // Accumulated stats from the most recent SubmitActions call.
+ // Reset before each call, populated by the runner implementation.
+ struct SubmitStats
+ {
+ std::atomic<size_t> TotalAttachments{0};
+ std::atomic<uint64_t> TotalAttachmentBytes{0};
+
+ void Reset()
+ {
+ TotalAttachments.store(0, std::memory_order_relaxed);
+ TotalAttachmentBytes.store(0, std::memory_order_relaxed);
+ }
+ };
+ SubmitStats m_LastSubmitStats;
+
// Best-effort cancellation of a specific in-flight action. Returns true if the
// cancellation signal was successfully sent. The action will transition to Cancelled
// asynchronously once the platform-level termination completes.
diff --git a/src/zencompute/runners/remotehttprunner.cpp b/src/zencompute/runners/remotehttprunner.cpp
index 1039969b4..55f78fdd6 100644
--- a/src/zencompute/runners/remotehttprunner.cpp
+++ b/src/zencompute/runners/remotehttprunner.cpp
@@ -39,6 +39,7 @@ RemoteHttpRunner::RemoteHttpRunner(ChunkResolver& InChunkResolver,
, m_ChunkResolver{InChunkResolver}
, m_WorkerPool{InWorkerPool}
, m_HostName{HostName}
+, m_DisplayName{HostName}
, m_BaseUrl{fmt::format("{}/compute", HostName)}
, m_Http(m_BaseUrl)
, m_InstanceId(Oid::NewOid())
@@ -60,6 +61,15 @@ RemoteHttpRunner::RemoteHttpRunner(ChunkResolver& InChunkResolver,
m_MonitorThread = std::thread{&RemoteHttpRunner::MonitorThreadFunction, this};
}
+void
+RemoteHttpRunner::SetRemoteHostname(std::string_view Hostname)
+{
+ if (!Hostname.empty())
+ {
+ m_DisplayName = fmt::format("{} ({})", m_HostName, Hostname);
+ }
+}
+
RemoteHttpRunner::~RemoteHttpRunner()
{
Shutdown();
@@ -394,6 +404,8 @@ RemoteHttpRunner::SubmitAction(Ref<RunnerAction> Action)
CompressedBuffer::FromCompressed(SharedBuffer{Chunk}, /* out */ DataRawHash, /* out */ DataRawSize);
Pkg.AddAttachment(CbAttachment(Compressed, AttachHash));
+ m_LastSubmitStats.TotalAttachments.fetch_add(1, std::memory_order_relaxed);
+ m_LastSubmitStats.TotalAttachmentBytes.fetch_add(Chunk.GetSize(), std::memory_order_relaxed);
}
});
@@ -460,6 +472,8 @@ RemoteHttpRunner::SubmitAction(Ref<RunnerAction> Action)
ZEN_ASSERT(DataRawHash == NeedHash);
Pkg.AddAttachment(CbAttachment(Compressed, NeedHash));
+ m_LastSubmitStats.TotalAttachments.fetch_add(1, std::memory_order_relaxed);
+ m_LastSubmitStats.TotalAttachmentBytes.fetch_add(Chunk.GetSize(), std::memory_order_relaxed);
}
else
{
@@ -583,6 +597,8 @@ RemoteHttpRunner::SubmitActionBatch(const std::string& SubmitUrl, const std::vec
CompressedBuffer::FromCompressed(SharedBuffer{Chunk}, /* out */ DataRawHash, /* out */ DataRawSize);
Pkg.AddAttachment(CbAttachment(Compressed, AttachHash));
+ m_LastSubmitStats.TotalAttachments.fetch_add(1, std::memory_order_relaxed);
+ m_LastSubmitStats.TotalAttachmentBytes.fetch_add(Chunk.GetSize(), std::memory_order_relaxed);
}
}
@@ -623,6 +639,8 @@ RemoteHttpRunner::SubmitActionBatch(const std::string& SubmitUrl, const std::vec
ZEN_ASSERT(DataRawHash == NeedHash);
Pkg.AddAttachment(CbAttachment(Compressed, NeedHash));
+ m_LastSubmitStats.TotalAttachments.fetch_add(1, std::memory_order_relaxed);
+ m_LastSubmitStats.TotalAttachmentBytes.fetch_add(Chunk.GetSize(), std::memory_order_relaxed);
}
else
{
diff --git a/src/zencompute/runners/remotehttprunner.h b/src/zencompute/runners/remotehttprunner.h
index 2010c98e8..fdf113c77 100644
--- a/src/zencompute/runners/remotehttprunner.h
+++ b/src/zencompute/runners/remotehttprunner.h
@@ -54,9 +54,10 @@ public:
[[nodiscard]] virtual size_t QueryCapacity() override;
[[nodiscard]] virtual std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) override;
virtual void CancelRemoteQueue(int QueueId) override;
- [[nodiscard]] virtual std::string_view GetDisplayName() const override { return m_HostName; }
+ [[nodiscard]] virtual std::string_view GetDisplayName() const override { return m_DisplayName; }
std::string_view GetHostName() const { return m_HostName; }
+ void SetRemoteHostname(std::string_view Hostname);
protected:
LoggerRef Log() { return m_Log; }
@@ -66,6 +67,7 @@ private:
ChunkResolver& m_ChunkResolver;
WorkerThreadPool& m_WorkerPool;
std::string m_HostName;
+ std::string m_DisplayName;
std::string m_BaseUrl;
HttpClient m_Http;