aboutsummaryrefslogtreecommitdiff
path: root/src/zenhorde
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-04-05 11:58:15 +0200
committerStefan Boberg <[email protected]>2026-04-05 11:58:15 +0200
commitc008abdb7efbce793115348adf2bcd91774432ee (patch)
tree627b2dfa4e1eb6201402cc95eb57e4f2aa05db69 /src/zenhorde
parentadd WriteCompressedFile RPC for compressed file transfers to HordeAgent (diff)
downloadzen-c008abdb7efbce793115348adf2bcd91774432ee.tar.xz
zen-c008abdb7efbce793115348adf2bcd91774432ee.zip
add compressed file upload path gated on ComputeProtocol version
When the Horde server reports ComputeProtocol >= CompressedFileTransfer (v3), the provisioner uploads binaries individually via WriteCompressedFile with IoHash caching instead of the legacy bundle protocol. This avoids the bundle packaging overhead and enables instant cache hits for unchanged files.
Diffstat (limited to 'src/zenhorde')
-rw-r--r--src/zenhorde/hordeagent.cpp51
-rw-r--r--src/zenhorde/hordeagent.h5
-rw-r--r--src/zenhorde/hordeclient.cpp6
-rw-r--r--src/zenhorde/hordeprovisioner.cpp51
-rw-r--r--src/zenhorde/include/zenhorde/hordeclient.h21
-rw-r--r--src/zenhorde/include/zenhorde/hordeconfig.h10
-rw-r--r--src/zenhorde/include/zenhorde/hordeprovisioner.h9
7 files changed, 132 insertions, 21 deletions
diff --git a/src/zenhorde/hordeagent.cpp b/src/zenhorde/hordeagent.cpp
index 8217ace81..480b4b985 100644
--- a/src/zenhorde/hordeagent.cpp
+++ b/src/zenhorde/hordeagent.cpp
@@ -201,6 +201,57 @@ HordeAgent::UploadBinaries(const std::filesystem::path& BundleDir, const std::st
}
bool
+HordeAgent::UploadCompressedFiles(const std::vector<std::filesystem::path>& FilePaths)
+{
+ ZEN_TRACE_CPU("HordeAgent::UploadCompressedFiles");
+
+ for (const std::filesystem::path& FilePath : FilePaths)
+ {
+ std::error_code Ec;
+ BasicFile File;
+ File.Open(FilePath, BasicFile::Mode::kRead, Ec);
+ if (Ec)
+ {
+ if (!std::filesystem::exists(FilePath))
+ {
+ ZEN_DEBUG("skipping missing optional file: '{}'", FilePath);
+ continue;
+ }
+ ZEN_ERROR("failed to open file for compressed upload: '{}'", FilePath);
+ return false;
+ }
+
+ IoBuffer RawData = File.ReadAll();
+ if (RawData.GetSize() == 0)
+ {
+ ZEN_WARN("empty file, skipping: '{}'", FilePath);
+ continue;
+ }
+
+ const IoHash Hash = IoHash::HashBuffer(RawData.GetData(), RawData.GetSize());
+ const SharedBuffer Shared(std::move(RawData));
+ const CompressedBuffer Compressed = CompressedBuffer::Compress(Shared);
+
+ if (!Compressed)
+ {
+ ZEN_ERROR("failed to compress file: '{}'", FilePath);
+ return false;
+ }
+
+ const std::string SandboxPath = FilePath.filename().string();
+ if (!UploadCompressedFile(SandboxPath, Compressed, Hash))
+ {
+ ZEN_ERROR("failed to upload compressed file: '{}'", FilePath);
+ return false;
+ }
+
+ ZEN_INFO("uploaded '{}' ({} bytes compressed)", SandboxPath, Compressed.GetCompressedSize());
+ }
+
+ return true;
+}
+
+bool
HordeAgent::UploadCompressedFile(std::string_view SandboxRelativePath, const CompressedBuffer& Compressed, const IoHash& UncompressedHash)
{
ZEN_TRACE_CPU("HordeAgent::UploadCompressedFile");
diff --git a/src/zenhorde/hordeagent.h b/src/zenhorde/hordeagent.h
index ee36e2c44..6456b2764 100644
--- a/src/zenhorde/hordeagent.h
+++ b/src/zenhorde/hordeagent.h
@@ -45,6 +45,11 @@ public:
* @param BundleLocator Locator string identifying the bundle (from CreateBundle). */
bool UploadBinaries(const std::filesystem::path& BundleDir, const std::string& BundleLocator);
+ /** Upload files to the agent's sandbox using compressed transfer with IoHash caching.
+ * Each file is individually compressed, hashed, and sent via WriteCompressedFile.
+ * @param FilePaths Absolute paths to the files to upload. */
+ bool UploadCompressedFiles(const std::vector<std::filesystem::path>& FilePaths);
+
/** Upload a file to the agent's sandbox using compressed transfer with IoHash caching.
* The agent caches files by IoHash, so repeated uploads of the same content are instant.
* @param SandboxRelativePath Path relative to the agent's sandbox root.
diff --git a/src/zenhorde/hordeclient.cpp b/src/zenhorde/hordeclient.cpp
index 0eefc57c6..11cac0cb3 100644
--- a/src/zenhorde/hordeclient.cpp
+++ b/src/zenhorde/hordeclient.cpp
@@ -100,6 +100,7 @@ HordeClient::BuildRequestBody() const
json11::Json::object Root;
Root["requirements"] = Requirements;
Root["connection"] = Connection;
+ Root["protocol"] = static_cast<int>(ComputeProtocol::CompressedFileTransfer);
return json11::Json(Root).dump();
}
@@ -367,6 +368,11 @@ HordeClient::RequestMachine(const std::string& RequestBody, const std::string& C
OutMachine.LeaseId = LeaseIdVal.string_value();
}
+ if (const json11::Json ProtocolVal = Json["protocol"]; ProtocolVal.is_number())
+ {
+ OutMachine.Protocol = static_cast<ComputeProtocol>(ProtocolVal.int_value());
+ }
+
ZEN_INFO("Horde machine assigned [{}:{}] cores={} lease={}",
OutMachine.GetConnectionAddress(),
OutMachine.GetConnectionPort(),
diff --git a/src/zenhorde/hordeprovisioner.cpp b/src/zenhorde/hordeprovisioner.cpp
index f88c95da2..79dacf57e 100644
--- a/src/zenhorde/hordeprovisioner.cpp
+++ b/src/zenhorde/hordeprovisioner.cpp
@@ -154,19 +154,23 @@ HordeProvisioner::ThreadAgent(AgentWrapper& Wrapper)
{
const std::filesystem::path OutputDir = m_WorkingDir / "horde_bundles";
- std::vector<BundleFile> Files;
-
#if ZEN_PLATFORM_WINDOWS
- Files.emplace_back(m_BinariesPath / "zenserver.exe", false);
+ m_BinaryFiles.push_back({m_BinariesPath / "zenserver.exe", false});
#elif ZEN_PLATFORM_LINUX
- Files.emplace_back(m_BinariesPath / "zenserver", false);
- Files.emplace_back(m_BinariesPath / "zenserver.debug", true);
+ m_BinaryFiles.push_back({m_BinariesPath / "zenserver", false});
+ m_BinaryFiles.push_back({m_BinariesPath / "zenserver.debug", true});
#elif ZEN_PLATFORM_MAC
- Files.emplace_back(m_BinariesPath / "zenserver", false);
+ m_BinaryFiles.push_back({m_BinariesPath / "zenserver", false});
#endif
+ std::vector<BundleFile> BundleFiles;
+ for (const BinaryFileEntry& Entry : m_BinaryFiles)
+ {
+ BundleFiles.emplace_back(Entry.Path, Entry.Optional);
+ }
+
BundleResult Result;
- if (!BundleCreator::CreateBundle(Files, OutputDir, Result))
+ if (!BundleCreator::CreateBundle(BundleFiles, OutputDir, Result))
{
ZEN_WARN("failed to create bundle, cannot provision any agents!");
m_AskForAgents.store(false);
@@ -274,19 +278,46 @@ HordeProvisioner::ThreadAgent(AgentWrapper& Wrapper)
return;
}
- for (auto& [Locator, BundleDir] : m_Bundles)
+ if (Machine.Protocol >= ComputeProtocol::CompressedFileTransfer)
{
+ // Compressed file transfer: upload each binary individually with IoHash caching
+ std::vector<std::filesystem::path> FilePaths;
+ {
+ std::lock_guard<std::mutex> BundleLock(m_BundleLock);
+ for (const BinaryFileEntry& Entry : m_BinaryFiles)
+ {
+ FilePaths.push_back(Entry.Path);
+ }
+ }
+
if (Wrapper.ShouldExit.load())
{
return;
}
- if (!Agent->UploadBinaries(BundleDir, Locator))
+ if (!Agent->UploadCompressedFiles(FilePaths))
{
- ZEN_WARN("UploadBinaries failed");
+ ZEN_WARN("UploadCompressedFiles failed");
return;
}
}
+ else
+ {
+ // Legacy bundle upload
+ for (auto& [Locator, BundleDir] : m_Bundles)
+ {
+ if (Wrapper.ShouldExit.load())
+ {
+ return;
+ }
+
+ if (!Agent->UploadBinaries(BundleDir, Locator))
+ {
+ ZEN_WARN("UploadBinaries failed");
+ return;
+ }
+ }
+ }
if (Wrapper.ShouldExit.load())
{
diff --git a/src/zenhorde/include/zenhorde/hordeclient.h b/src/zenhorde/include/zenhorde/hordeclient.h
index 201d68b83..98cc8f099 100644
--- a/src/zenhorde/include/zenhorde/hordeclient.h
+++ b/src/zenhorde/include/zenhorde/hordeclient.h
@@ -35,16 +35,17 @@ struct PortInfo
*/
struct MachineInfo
{
- std::string Ip;
- ConnectionMode Mode = ConnectionMode::Direct;
- std::string ConnectionAddress; ///< Relay/tunnel address (used when Mode != Direct)
- uint16_t Port = 0;
- uint16_t LogicalCores = 0;
- Encryption EncryptionMode = Encryption::None;
- uint8_t Nonce[NonceSize] = {}; ///< 64-byte nonce sent during TCP handshake
- uint8_t Key[KeySize] = {}; ///< 32-byte AES key (when EncryptionMode == AES)
- bool IsWindows = false;
- std::string LeaseId;
+ std::string Ip;
+ ConnectionMode Mode = ConnectionMode::Direct;
+ std::string ConnectionAddress; ///< Relay/tunnel address (used when Mode != Direct)
+ uint16_t Port = 0;
+ uint16_t LogicalCores = 0;
+ Encryption EncryptionMode = Encryption::None;
+ uint8_t Nonce[NonceSize] = {}; ///< 64-byte nonce sent during TCP handshake
+ uint8_t Key[KeySize] = {}; ///< 32-byte AES key (when EncryptionMode == AES)
+ bool IsWindows = false;
+ std::string LeaseId;
+ ComputeProtocol Protocol = ComputeProtocol::Initial;
std::map<std::string, PortInfo> Ports;
diff --git a/src/zenhorde/include/zenhorde/hordeconfig.h b/src/zenhorde/include/zenhorde/hordeconfig.h
index dd70f9832..abc5f938c 100644
--- a/src/zenhorde/include/zenhorde/hordeconfig.h
+++ b/src/zenhorde/include/zenhorde/hordeconfig.h
@@ -8,6 +8,16 @@
namespace zen::horde {
+/** Protocol version for the Horde compute transport.
+ * Both sides learn this from the Horde server's ComputeTask — it is never exchanged on the wire. */
+enum class ComputeProtocol : int
+{
+ Unknown = 0,
+ Initial = 1,
+ NewCpuEnvVars = 2,
+ CompressedFileTransfer = 3,
+};
+
/** Transport connection mode for Horde compute agents. */
enum class ConnectionMode
{
diff --git a/src/zenhorde/include/zenhorde/hordeprovisioner.h b/src/zenhorde/include/zenhorde/hordeprovisioner.h
index 4e2e63bbd..909d6d528 100644
--- a/src/zenhorde/include/zenhorde/hordeprovisioner.h
+++ b/src/zenhorde/include/zenhorde/hordeprovisioner.h
@@ -87,7 +87,14 @@ private:
std::unique_ptr<HordeClient> m_HordeClient;
- std::mutex m_BundleLock;
+ struct BinaryFileEntry
+ {
+ std::filesystem::path Path;
+ bool Optional;
+ };
+
+ std::mutex m_BundleLock;
+ std::vector<BinaryFileEntry> m_BinaryFiles; ///< Files to upload (populated once, guarded by m_BundleLock)
std::vector<std::pair<std::string, std::filesystem::path>> m_Bundles; ///< (locator, bundleDir) pairs
bool m_BundlesCreated = false;