aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-04-02 22:52:53 +0200
committerStefan Boberg <[email protected]>2026-04-02 22:52:53 +0200
commit1f766abf8fce4bf24d600bc5cb3c3c396f8a29ca (patch)
treeb3b60be4789caaed07903c07ca7a5107fe49029f /src/zencompute
parentAdd MemoryCidStore: memory-backed chunk store with async write-through (diff)
parentAdd hostname and payload stats to remote submit warnings (diff)
downloadzen-sb/compute-oidc-auth.tar.xz
zen-sb/compute-oidc-auth.zip
Merge branch 'sb/compute-oidc-auth' of https://github.ol.epicgames.net/ue-foundation/zen into sb/compute-oidc-authsb/compute-oidc-auth
Diffstat (limited to 'src/zencompute')
-rw-r--r--src/zencompute/computeservice.cpp1
-rw-r--r--src/zencompute/runners/functionrunner.cpp16
-rw-r--r--src/zencompute/runners/functionrunner.h15
-rw-r--r--src/zencompute/runners/remotehttprunner.cpp18
-rw-r--r--src/zencompute/runners/remotehttprunner.h4
5 files changed, 50 insertions, 4 deletions
diff --git a/src/zencompute/computeservice.cpp b/src/zencompute/computeservice.cpp
index 857c2c567..16b95cb4e 100644
--- a/src/zencompute/computeservice.cpp
+++ b/src/zencompute/computeservice.cpp
@@ -617,6 +617,7 @@ ComputeServiceSession::Impl::UpdateCoordinatorState()
m_KnownWorkerUris.insert(UriStr);
auto* NewRunner = new RemoteHttpRunner(m_ChunkResolver, m_OrchestratorBasePath, UriStr, m_RemoteSubmitPool);
+ NewRunner->SetRemoteHostname(Hostname);
SyncWorkersToRunner(*NewRunner);
m_RemoteRunnerGroup.AddRunner(NewRunner);
}
diff --git a/src/zencompute/runners/functionrunner.cpp b/src/zencompute/runners/functionrunner.cpp
index f3d5acca5..ab22c6363 100644
--- a/src/zencompute/runners/functionrunner.cpp
+++ b/src/zencompute/runners/functionrunner.cpp
@@ -6,7 +6,9 @@
# include <zencore/compactbinary.h>
# include <zencore/filesystem.h>
+# include <zencore/fmtutils.h>
# include <zencore/logging.h>
+# include <zencore/string.h>
# include <zencore/timer.h>
# include <zencore/trace.h>
# include <zencore/workthreadpool.h>
@@ -206,16 +208,24 @@ BaseRunnerGroup::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
static constexpr uint64_t SubmitWarnThresholdMs = 500;
auto SubmitToRunner = [&](int RunnerIndex) {
+ auto& Runner = Runners[RunnerIndex];
+ Runner->m_LastSubmitStats.Reset();
+
Stopwatch Timer;
- PerRunnerResults[RunnerIndex] = Runners[RunnerIndex]->SubmitActions(PerRunnerActions[RunnerIndex]);
+ PerRunnerResults[RunnerIndex] = Runner->SubmitActions(PerRunnerActions[RunnerIndex]);
uint64_t ElapsedMs = Timer.GetElapsedTimeMs();
if (ElapsedMs >= SubmitWarnThresholdMs)
{
- ZEN_WARN("submit of {} actions to '{}' took {}ms",
+ size_t Attachments = Runner->m_LastSubmitStats.TotalAttachments.load(std::memory_order_relaxed);
+ uint64_t AttachmentBytes = Runner->m_LastSubmitStats.TotalAttachmentBytes.load(std::memory_order_relaxed);
+
+ ZEN_WARN("submit of {} actions ({} attachments, {}) to '{}' took {}ms",
PerRunnerActions[RunnerIndex].size(),
- Runners[RunnerIndex]->GetDisplayName(),
+ Attachments,
+ NiceBytes(AttachmentBytes),
+ Runner->GetDisplayName(),
ElapsedMs);
}
};
diff --git a/src/zencompute/runners/functionrunner.h b/src/zencompute/runners/functionrunner.h
index a8234237c..449f0e228 100644
--- a/src/zencompute/runners/functionrunner.h
+++ b/src/zencompute/runners/functionrunner.h
@@ -43,6 +43,21 @@ public:
[[nodiscard]] virtual std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions);
[[nodiscard]] virtual std::string_view GetDisplayName() const { return "local"; }
+ // Accumulated stats from the most recent SubmitActions call.
+ // Reset before each call, populated by the runner implementation.
+ struct SubmitStats
+ {
+ std::atomic<size_t> TotalAttachments{0};
+ std::atomic<uint64_t> TotalAttachmentBytes{0};
+
+ void Reset()
+ {
+ TotalAttachments.store(0, std::memory_order_relaxed);
+ TotalAttachmentBytes.store(0, std::memory_order_relaxed);
+ }
+ };
+ SubmitStats m_LastSubmitStats;
+
// Best-effort cancellation of a specific in-flight action. Returns true if the
// cancellation signal was successfully sent. The action will transition to Cancelled
// asynchronously once the platform-level termination completes.
diff --git a/src/zencompute/runners/remotehttprunner.cpp b/src/zencompute/runners/remotehttprunner.cpp
index 1039969b4..55f78fdd6 100644
--- a/src/zencompute/runners/remotehttprunner.cpp
+++ b/src/zencompute/runners/remotehttprunner.cpp
@@ -39,6 +39,7 @@ RemoteHttpRunner::RemoteHttpRunner(ChunkResolver& InChunkResolver,
, m_ChunkResolver{InChunkResolver}
, m_WorkerPool{InWorkerPool}
, m_HostName{HostName}
+, m_DisplayName{HostName}
, m_BaseUrl{fmt::format("{}/compute", HostName)}
, m_Http(m_BaseUrl)
, m_InstanceId(Oid::NewOid())
@@ -60,6 +61,15 @@ RemoteHttpRunner::RemoteHttpRunner(ChunkResolver& InChunkResolver,
m_MonitorThread = std::thread{&RemoteHttpRunner::MonitorThreadFunction, this};
}
+void
+RemoteHttpRunner::SetRemoteHostname(std::string_view Hostname)
+{
+ if (!Hostname.empty())
+ {
+ m_DisplayName = fmt::format("{} ({})", m_HostName, Hostname);
+ }
+}
+
RemoteHttpRunner::~RemoteHttpRunner()
{
Shutdown();
@@ -394,6 +404,8 @@ RemoteHttpRunner::SubmitAction(Ref<RunnerAction> Action)
CompressedBuffer::FromCompressed(SharedBuffer{Chunk}, /* out */ DataRawHash, /* out */ DataRawSize);
Pkg.AddAttachment(CbAttachment(Compressed, AttachHash));
+ m_LastSubmitStats.TotalAttachments.fetch_add(1, std::memory_order_relaxed);
+ m_LastSubmitStats.TotalAttachmentBytes.fetch_add(Chunk.GetSize(), std::memory_order_relaxed);
}
});
@@ -460,6 +472,8 @@ RemoteHttpRunner::SubmitAction(Ref<RunnerAction> Action)
ZEN_ASSERT(DataRawHash == NeedHash);
Pkg.AddAttachment(CbAttachment(Compressed, NeedHash));
+ m_LastSubmitStats.TotalAttachments.fetch_add(1, std::memory_order_relaxed);
+ m_LastSubmitStats.TotalAttachmentBytes.fetch_add(Chunk.GetSize(), std::memory_order_relaxed);
}
else
{
@@ -583,6 +597,8 @@ RemoteHttpRunner::SubmitActionBatch(const std::string& SubmitUrl, const std::vec
CompressedBuffer::FromCompressed(SharedBuffer{Chunk}, /* out */ DataRawHash, /* out */ DataRawSize);
Pkg.AddAttachment(CbAttachment(Compressed, AttachHash));
+ m_LastSubmitStats.TotalAttachments.fetch_add(1, std::memory_order_relaxed);
+ m_LastSubmitStats.TotalAttachmentBytes.fetch_add(Chunk.GetSize(), std::memory_order_relaxed);
}
}
@@ -623,6 +639,8 @@ RemoteHttpRunner::SubmitActionBatch(const std::string& SubmitUrl, const std::vec
ZEN_ASSERT(DataRawHash == NeedHash);
Pkg.AddAttachment(CbAttachment(Compressed, NeedHash));
+ m_LastSubmitStats.TotalAttachments.fetch_add(1, std::memory_order_relaxed);
+ m_LastSubmitStats.TotalAttachmentBytes.fetch_add(Chunk.GetSize(), std::memory_order_relaxed);
}
else
{
diff --git a/src/zencompute/runners/remotehttprunner.h b/src/zencompute/runners/remotehttprunner.h
index 2010c98e8..fdf113c77 100644
--- a/src/zencompute/runners/remotehttprunner.h
+++ b/src/zencompute/runners/remotehttprunner.h
@@ -54,9 +54,10 @@ public:
[[nodiscard]] virtual size_t QueryCapacity() override;
[[nodiscard]] virtual std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) override;
virtual void CancelRemoteQueue(int QueueId) override;
- [[nodiscard]] virtual std::string_view GetDisplayName() const override { return m_HostName; }
+ [[nodiscard]] virtual std::string_view GetDisplayName() const override { return m_DisplayName; }
std::string_view GetHostName() const { return m_HostName; }
+ void SetRemoteHostname(std::string_view Hostname);
protected:
LoggerRef Log() { return m_Log; }
@@ -66,6 +67,7 @@ private:
ChunkResolver& m_ChunkResolver;
WorkerThreadPool& m_WorkerPool;
std::string m_HostName;
+ std::string m_DisplayName;
std::string m_BaseUrl;
HttpClient m_Http;