aboutsummaryrefslogtreecommitdiff
path: root/src/zenhorde/hordecomputebuffer.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-04-13 16:38:16 +0200
committerGitHub Enterprise <[email protected]>2026-04-13 16:38:16 +0200
commit795345e5fd7974a1f5227d507a58bb3ed75eafd5 (patch)
tree7a0f142bf562c3590400586c82b0e7a1b5ad6493 /src/zenhorde/hordecomputebuffer.cpp
parent5.8.4-pre2 (diff)
downloadzen-795345e5fd7974a1f5227d507a58bb3ed75eafd5.tar.xz
zen-795345e5fd7974a1f5227d507a58bb3ed75eafd5.zip
Compute OIDC auth, async Horde agents, and orchestrator improvements (#913)
Rework of the Horde agent subsystem from synchronous per-thread I/O to an async ASIO-driven architecture, plus provisioner scale-down with graceful draining, OIDC authentication, scheduler improvements, and dashboard UI for provisioner control. ### Async Horde Agent Rewrite - Replace synchronous `HordeAgent` (one thread per agent, blocking I/O) with `AsyncHordeAgent` — an ASIO state machine running on a shared `io_context` thread pool - Replace `TcpComputeTransport`/`AesComputeTransport` with `AsyncTcpComputeTransport`/`AsyncAesComputeTransport` - Replace `AgentMessageChannel` with `AsyncAgentMessageChannel` using frame queuing and ASIO timers - Delete `ComputeBuffer` and `ComputeChannel` ring-buffer classes (no longer needed) ### Provisioner Drain / Scale-Down - `HordeProvisioner` can now drain agents when target core count is lowered: queries each agent's `/compute/session/status` for workload, selects candidates by largest-fit/lowest-workload, and sends `/compute/session/drain` - Configurable `--horde-drain-grace-period` (default 300s) before force-kill - Implement `IProvisionerStateProvider` interface to expose provisioner state to the orchestrator HTTP layer - Forward `--coordinator-session`, `--provision-clean`, and `--provision-tracehost` through both Horde and Nomad provisioners to spawned workers ### OIDC Authentication - `HordeClient` accepts an `AccessTokenProvider` (refreshable token function) as alternative to static `--horde-token` - Wire up `OidcToken.exe` auto-discovery via `httpclientauth::CreateFromOidcTokenExecutable` with `--HordeUrl` mode - New `--horde-oidctoken-exe-path` CLI option for explicit path override ### Orchestrator & Scheduler - Orchestrator generates a session ID at startup; workers include `coordinator_session` in announcements so the orchestrator can reject stale-session workers - New `Rejected` action state — when a remote runner declines at capacity, the action is rescheduled without retry count increment - Reduce scheduler lock contention: snapshot pending actions under shared lock, sort/trim outside the lock - Parallelize remote action submission across runners via `WorkerThreadPool` with slow-submit warnings - New action field `FailureReason` populated by all runner types (exit codes, sandbox failures, exceptions) - New endpoints: `session/drain`, `session/status`, `session/sunset`, `provisioner/status`, `provisioner/target` ### Remote Execution - Eager-attach mode for `RemoteHttpRunner` — bundles all attachments upfront in a `CbPackage` for single-roundtrip submits - Track in-flight submissions to prevent over-queuing - Show remote runner hostname in `GetDisplayName()` - `--announce-url` to override the endpoint announced to the coordinator (e.g. relay-visible address) ### Frontend Dashboard - Delete standalone `compute.html` (925 lines) and `orchestrator.html` (669 lines), consolidated into JS page modules - Add provisioner panel to orchestrator dashboard: target/active/estimated core counts, draining agent count - Editable target-cores input with debounced POST to `/orch/provisioner/target` - Per-agent provisioning status badges (active / draining / deallocated) in the agents table - Active vs total CPU counts in agents summary row ### CLI - New `zen compute record-start` / `record-stop` subcommands - `zen exec` progress bar with submit and completion phases, atomic work counters, `--progress` mode (Pretty/Plain/Quiet) ### Other - `DataDir` supports environment variable expansion - Worker manifest validation checks for `worker.zcb` marker to detect incomplete cached directories - Linux/Mac runners `nice(5)` child processes to avoid starving the main server - `ComputeService::SetShutdownCallback` wired to `RequestExit` via `session/sunset` - Curl HTTP client logs effective URL on failure - `MachineInfo` carries `Pool` and `Mode` from Horde response - Horde bundle creation includes `.pdb` on Windows
Diffstat (limited to 'src/zenhorde/hordecomputebuffer.cpp')
-rw-r--r--src/zenhorde/hordecomputebuffer.cpp454
1 files changed, 0 insertions, 454 deletions
diff --git a/src/zenhorde/hordecomputebuffer.cpp b/src/zenhorde/hordecomputebuffer.cpp
deleted file mode 100644
index 0d032b5d5..000000000
--- a/src/zenhorde/hordecomputebuffer.cpp
+++ /dev/null
@@ -1,454 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include "hordecomputebuffer.h"
-
-#include <algorithm>
-#include <cassert>
-#include <chrono>
-#include <condition_variable>
-#include <cstring>
-
-namespace zen::horde {
-
-// Simplified ring buffer implementation for in-process use only.
-// Uses a single contiguous buffer with write/read cursors and
-// mutex+condvar for synchronization. This is simpler than the UE version
-// which uses lock-free atomics and shared memory, but sufficient for our
-// use case where we're the initiator side of the compute protocol.
-
-struct ComputeBuffer::Detail : TRefCounted<Detail>
-{
- std::vector<uint8_t> Data;
- size_t NumChunks = 0;
- size_t ChunkLength = 0;
-
- // Current write state
- size_t WriteChunkIdx = 0;
- size_t WriteOffset = 0;
- bool WriteComplete = false;
-
- // Current read state
- size_t ReadChunkIdx = 0;
- size_t ReadOffset = 0;
- bool Detached = false;
-
- // Per-chunk written length
- std::vector<size_t> ChunkWrittenLength;
- std::vector<bool> ChunkFinished; // Writer moved to next chunk
-
- std::mutex Mutex;
- std::condition_variable ReadCV; ///< Signaled when new data is written or stream completes
- std::condition_variable WriteCV; ///< Signaled when reader advances past a chunk, freeing space
-
- bool HasWriter = false;
- bool HasReader = false;
-
- uint8_t* ChunkPtr(size_t ChunkIdx) { return Data.data() + ChunkIdx * ChunkLength; }
- const uint8_t* ChunkPtr(size_t ChunkIdx) const { return Data.data() + ChunkIdx * ChunkLength; }
-};
-
-// ComputeBuffer
-
-ComputeBuffer::ComputeBuffer()
-{
-}
-ComputeBuffer::~ComputeBuffer()
-{
-}
-
-bool
-ComputeBuffer::CreateNew(const Params& InParams)
-{
- auto* NewDetail = new Detail();
- NewDetail->NumChunks = InParams.NumChunks;
- NewDetail->ChunkLength = InParams.ChunkLength;
- NewDetail->Data.resize(InParams.NumChunks * InParams.ChunkLength, 0);
- NewDetail->ChunkWrittenLength.resize(InParams.NumChunks, 0);
- NewDetail->ChunkFinished.resize(InParams.NumChunks, false);
-
- m_Detail = NewDetail;
- return true;
-}
-
-void
-ComputeBuffer::Close()
-{
- m_Detail = nullptr;
-}
-
-bool
-ComputeBuffer::IsValid() const
-{
- return static_cast<bool>(m_Detail);
-}
-
-ComputeBufferReader
-ComputeBuffer::CreateReader()
-{
- assert(m_Detail);
- m_Detail->HasReader = true;
- return ComputeBufferReader(m_Detail);
-}
-
-ComputeBufferWriter
-ComputeBuffer::CreateWriter()
-{
- assert(m_Detail);
- m_Detail->HasWriter = true;
- return ComputeBufferWriter(m_Detail);
-}
-
-// ComputeBufferReader
-
-ComputeBufferReader::ComputeBufferReader()
-{
-}
-ComputeBufferReader::~ComputeBufferReader()
-{
-}
-
-ComputeBufferReader::ComputeBufferReader(const ComputeBufferReader& Other) = default;
-ComputeBufferReader::ComputeBufferReader(ComputeBufferReader&& Other) noexcept = default;
-ComputeBufferReader& ComputeBufferReader::operator=(const ComputeBufferReader& Other) = default;
-ComputeBufferReader& ComputeBufferReader::operator=(ComputeBufferReader&& Other) noexcept = default;
-
-ComputeBufferReader::ComputeBufferReader(Ref<ComputeBuffer::Detail> InDetail) : m_Detail(std::move(InDetail))
-{
-}
-
-void
-ComputeBufferReader::Close()
-{
- m_Detail = nullptr;
-}
-
-void
-ComputeBufferReader::Detach()
-{
- if (m_Detail)
- {
- std::lock_guard<std::mutex> Lock(m_Detail->Mutex);
- m_Detail->Detached = true;
- m_Detail->ReadCV.notify_all();
- }
-}
-
-bool
-ComputeBufferReader::IsValid() const
-{
- return static_cast<bool>(m_Detail);
-}
-
-bool
-ComputeBufferReader::IsComplete() const
-{
- if (!m_Detail)
- {
- return true;
- }
- std::lock_guard<std::mutex> Lock(m_Detail->Mutex);
- if (m_Detail->Detached)
- {
- return true;
- }
- return m_Detail->WriteComplete && m_Detail->ReadChunkIdx == m_Detail->WriteChunkIdx &&
- m_Detail->ReadOffset >= m_Detail->ChunkWrittenLength[m_Detail->ReadChunkIdx];
-}
-
-void
-ComputeBufferReader::AdvanceReadPosition(size_t Size)
-{
- if (!m_Detail)
- {
- return;
- }
-
- std::lock_guard<std::mutex> Lock(m_Detail->Mutex);
-
- m_Detail->ReadOffset += Size;
-
- // Check if we need to move to next chunk
- const size_t ReadChunk = m_Detail->ReadChunkIdx;
- if (m_Detail->ChunkFinished[ReadChunk] && m_Detail->ReadOffset >= m_Detail->ChunkWrittenLength[ReadChunk])
- {
- const size_t NextChunk = (ReadChunk + 1) % m_Detail->NumChunks;
- m_Detail->ReadChunkIdx = NextChunk;
- m_Detail->ReadOffset = 0;
- m_Detail->WriteCV.notify_all();
- }
-
- m_Detail->ReadCV.notify_all();
-}
-
-size_t
-ComputeBufferReader::GetMaxReadSize() const
-{
- if (!m_Detail)
- {
- return 0;
- }
- std::lock_guard<std::mutex> Lock(m_Detail->Mutex);
- const size_t ReadChunk = m_Detail->ReadChunkIdx;
- return m_Detail->ChunkWrittenLength[ReadChunk] - m_Detail->ReadOffset;
-}
-
-const uint8_t*
-ComputeBufferReader::WaitToRead(size_t MinSize, int TimeoutMs, bool* OutTimedOut)
-{
- if (!m_Detail)
- {
- return nullptr;
- }
-
- std::unique_lock<std::mutex> Lock(m_Detail->Mutex);
-
- auto Predicate = [&]() -> bool {
- if (m_Detail->Detached)
- {
- return true;
- }
-
- const size_t ReadChunk = m_Detail->ReadChunkIdx;
- const size_t Available = m_Detail->ChunkWrittenLength[ReadChunk] - m_Detail->ReadOffset;
-
- if (Available >= MinSize)
- {
- return true;
- }
-
- // If chunk is finished and we've read everything, try to move to next
- if (m_Detail->ChunkFinished[ReadChunk] && m_Detail->ReadOffset >= m_Detail->ChunkWrittenLength[ReadChunk])
- {
- if (m_Detail->WriteComplete)
- {
- return true; // End of stream
- }
- // Move to next chunk
- const size_t NextChunk = (ReadChunk + 1) % m_Detail->NumChunks;
- m_Detail->ReadChunkIdx = NextChunk;
- m_Detail->ReadOffset = 0;
- m_Detail->WriteCV.notify_all();
- return false; // Re-check with new chunk
- }
-
- if (m_Detail->WriteComplete)
- {
- return true; // End of stream
- }
-
- return false;
- };
-
- if (TimeoutMs < 0)
- {
- m_Detail->ReadCV.wait(Lock, Predicate);
- }
- else
- {
- if (!m_Detail->ReadCV.wait_for(Lock, std::chrono::milliseconds(TimeoutMs), Predicate))
- {
- if (OutTimedOut)
- {
- *OutTimedOut = true;
- }
- return nullptr;
- }
- }
-
- if (m_Detail->Detached)
- {
- return nullptr;
- }
-
- const size_t ReadChunk = m_Detail->ReadChunkIdx;
- const size_t Available = m_Detail->ChunkWrittenLength[ReadChunk] - m_Detail->ReadOffset;
-
- if (Available < MinSize)
- {
- return nullptr; // End of stream
- }
-
- return m_Detail->ChunkPtr(ReadChunk) + m_Detail->ReadOffset;
-}
-
-size_t
-ComputeBufferReader::Read(void* Buffer, size_t MaxSize, int TimeoutMs, bool* OutTimedOut)
-{
- const uint8_t* Data = WaitToRead(1, TimeoutMs, OutTimedOut);
- if (!Data)
- {
- return 0;
- }
-
- const size_t Available = GetMaxReadSize();
- const size_t ToCopy = std::min(Available, MaxSize);
- memcpy(Buffer, Data, ToCopy);
- AdvanceReadPosition(ToCopy);
- return ToCopy;
-}
-
-// ComputeBufferWriter
-
-ComputeBufferWriter::ComputeBufferWriter() = default;
-ComputeBufferWriter::ComputeBufferWriter(const ComputeBufferWriter& Other) = default;
-ComputeBufferWriter::ComputeBufferWriter(ComputeBufferWriter&& Other) noexcept = default;
-ComputeBufferWriter::~ComputeBufferWriter() = default;
-ComputeBufferWriter& ComputeBufferWriter::operator=(const ComputeBufferWriter& Other) = default;
-ComputeBufferWriter& ComputeBufferWriter::operator=(ComputeBufferWriter&& Other) noexcept = default;
-
-ComputeBufferWriter::ComputeBufferWriter(Ref<ComputeBuffer::Detail> InDetail) : m_Detail(std::move(InDetail))
-{
-}
-
-void
-ComputeBufferWriter::Close()
-{
- if (m_Detail)
- {
- {
- std::lock_guard<std::mutex> Lock(m_Detail->Mutex);
- if (!m_Detail->WriteComplete)
- {
- m_Detail->WriteComplete = true;
- m_Detail->ReadCV.notify_all();
- }
- }
- m_Detail = nullptr;
- }
-}
-
-bool
-ComputeBufferWriter::IsValid() const
-{
- return static_cast<bool>(m_Detail);
-}
-
-void
-ComputeBufferWriter::MarkComplete()
-{
- if (m_Detail)
- {
- std::lock_guard<std::mutex> Lock(m_Detail->Mutex);
- m_Detail->WriteComplete = true;
- m_Detail->ReadCV.notify_all();
- }
-}
-
-void
-ComputeBufferWriter::AdvanceWritePosition(size_t Size)
-{
- if (!m_Detail || Size == 0)
- {
- return;
- }
-
- std::lock_guard<std::mutex> Lock(m_Detail->Mutex);
- const size_t WriteChunk = m_Detail->WriteChunkIdx;
- m_Detail->ChunkWrittenLength[WriteChunk] += Size;
- m_Detail->WriteOffset += Size;
- m_Detail->ReadCV.notify_all();
-}
-
-size_t
-ComputeBufferWriter::GetMaxWriteSize() const
-{
- if (!m_Detail)
- {
- return 0;
- }
- std::lock_guard<std::mutex> Lock(m_Detail->Mutex);
- const size_t WriteChunk = m_Detail->WriteChunkIdx;
- return m_Detail->ChunkLength - m_Detail->ChunkWrittenLength[WriteChunk];
-}
-
-size_t
-ComputeBufferWriter::GetChunkMaxLength() const
-{
- if (!m_Detail)
- {
- return 0;
- }
- return m_Detail->ChunkLength;
-}
-
-size_t
-ComputeBufferWriter::Write(const void* Buffer, size_t MaxSize, int TimeoutMs)
-{
- uint8_t* Dest = WaitToWrite(1, TimeoutMs);
- if (!Dest)
- {
- return 0;
- }
-
- const size_t Available = GetMaxWriteSize();
- const size_t ToCopy = std::min(Available, MaxSize);
- memcpy(Dest, Buffer, ToCopy);
- AdvanceWritePosition(ToCopy);
- return ToCopy;
-}
-
-uint8_t*
-ComputeBufferWriter::WaitToWrite(size_t MinSize, int TimeoutMs)
-{
- if (!m_Detail)
- {
- return nullptr;
- }
-
- std::unique_lock<std::mutex> Lock(m_Detail->Mutex);
-
- if (m_Detail->WriteComplete)
- {
- return nullptr;
- }
-
- const size_t WriteChunk = m_Detail->WriteChunkIdx;
- const size_t Available = m_Detail->ChunkLength - m_Detail->ChunkWrittenLength[WriteChunk];
-
- // If current chunk has enough space, return pointer
- if (Available >= MinSize)
- {
- return m_Detail->ChunkPtr(WriteChunk) + m_Detail->ChunkWrittenLength[WriteChunk];
- }
-
- // Current chunk is full - mark it as finished and move to next.
- // The writer cannot advance until the reader has fully consumed the next chunk,
- // preventing the writer from overwriting data the reader hasn't processed yet.
- m_Detail->ChunkFinished[WriteChunk] = true;
- m_Detail->ReadCV.notify_all();
-
- const size_t NextChunk = (WriteChunk + 1) % m_Detail->NumChunks;
-
- // Wait until reader has consumed the next chunk
- auto Predicate = [&]() -> bool {
- // Check if read has moved past this chunk
- return m_Detail->ReadChunkIdx != NextChunk || m_Detail->Detached;
- };
-
- if (TimeoutMs < 0)
- {
- m_Detail->WriteCV.wait(Lock, Predicate);
- }
- else
- {
- if (!m_Detail->WriteCV.wait_for(Lock, std::chrono::milliseconds(TimeoutMs), Predicate))
- {
- return nullptr;
- }
- }
-
- if (m_Detail->Detached)
- {
- return nullptr;
- }
-
- // Reset next chunk
- m_Detail->ChunkWrittenLength[NextChunk] = 0;
- m_Detail->ChunkFinished[NextChunk] = false;
- m_Detail->WriteChunkIdx = NextChunk;
- m_Detail->WriteOffset = 0;
-
- return m_Detail->ChunkPtr(NextChunk);
-}
-
-} // namespace zen::horde