aboutsummaryrefslogtreecommitdiff
path: root/src/zen/cmds/exec_cmd.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-02-18 11:28:03 +0100
committerGitHub Enterprise <[email protected]>2026-02-18 11:28:03 +0100
commit149a5c2faa8d59290b8b44717e504532e906aae2 (patch)
tree9c875f1fd89f65f939bf8f6ef67b506565be845c /src/zen/cmds/exec_cmd.cpp
parentadd selective request logging support to http.sys (#762) (diff)
downloadarchived-zen-149a5c2faa8d59290b8b44717e504532e906aae2.tar.xz
archived-zen-149a5c2faa8d59290b8b44717e504532e906aae2.zip
structured compute basics (#714)
this change adds the `zencompute` component, which can be used to distribute work dispatched from UE using the DDB (Derived Data Build) APIs via zenserver this change also adds a distinct zenserver compute mode (`zenserver compute`) which is intended to be used for leaf compute nodes to exercise the compute functionality without directly involving UE, a `zen exec` subcommand is also added, which can be used to feed replays through the system all new functionality is considered *experimental* and disabled by default at this time, behind the `zencompute` option in xmake config
Diffstat (limited to 'src/zen/cmds/exec_cmd.cpp')
-rw-r--r--src/zen/cmds/exec_cmd.cpp654
1 files changed, 654 insertions, 0 deletions
diff --git a/src/zen/cmds/exec_cmd.cpp b/src/zen/cmds/exec_cmd.cpp
new file mode 100644
index 000000000..2d9d0d12e
--- /dev/null
+++ b/src/zen/cmds/exec_cmd.cpp
@@ -0,0 +1,654 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "exec_cmd.h"
+
+#include <zencompute/functionservice.h>
+#include <zencompute/recordingreader.h>
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinaryfile.h>
+#include <zencore/compactbinarypackage.h>
+#include <zencore/compactbinaryvalue.h>
+#include <zencore/compress.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
+#include <zencore/stream.h>
+#include <zencore/string.h>
+#include <zencore/timer.h>
+
+#include <EASTL/hash_map.h>
+#include <EASTL/hash_set.h>
+#include <EASTL/map.h>
+
+using namespace std::literals;
+
+namespace eastl {
+
+template<>
+struct hash<zen::IoHash> : public zen::IoHash::Hasher
+{
+};
+
+} // namespace eastl
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+namespace zen {
+
+ExecCommand::ExecCommand()
+{
+ m_Options.add_options()("h,help", "Print help");
+ m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName), "<hosturl>");
+ m_Options.add_option("", "", "log", "Action log directory", cxxopts::value(m_RecordingLogPath), "<path>");
+ m_Options.add_option("", "p", "path", "Recording path (directory or .actionlog file)", cxxopts::value(m_RecordingPath), "<path>");
+ m_Options.add_option("", "", "offset", "Recording replay start offset", cxxopts::value(m_Offset), "<offset>");
+ m_Options.add_option("", "", "stride", "Recording replay stride", cxxopts::value(m_Stride), "<stride>");
+ m_Options.add_option("", "", "limit", "Recording replay limit", cxxopts::value(m_Limit), "<limit>");
+ m_Options.add_option("", "", "beacon", "Beacon path", cxxopts::value(m_BeaconPath), "<path>");
+ m_Options.add_option("",
+ "",
+ "mode",
+ "Select execution mode (http,inproc,dump,direct,beacon,buildlog)",
+ cxxopts::value(m_Mode)->default_value("http"),
+ "<string>");
+ m_Options.add_option("", "", "quiet", "Quiet mode (less logging)", cxxopts::value(m_Quiet), "<bool>");
+ m_Options.parse_positional("mode");
+}
+
+ExecCommand::~ExecCommand()
+{
+}
+
+void
+ExecCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
+{
+ // Configure
+
+ if (!ParseOptions(argc, argv))
+ {
+ return;
+ }
+
+ m_HostName = ResolveTargetHostSpec(m_HostName);
+
+ if (m_RecordingPath.empty())
+ {
+ throw OptionParseException("replay path is required!", m_Options.help());
+ }
+
+ m_VerboseLogging = GlobalOptions.IsVerbose;
+ m_QuietLogging = m_Quiet && !m_VerboseLogging;
+
+ enum ExecMode
+ {
+ kHttp,
+ kDirect,
+ kInproc,
+ kDump,
+ kBeacon,
+ kBuildLog
+ } Mode;
+
+ if (m_Mode == "http"sv)
+ {
+ Mode = kHttp;
+ }
+ else if (m_Mode == "direct"sv)
+ {
+ Mode = kDirect;
+ }
+ else if (m_Mode == "inproc"sv)
+ {
+ Mode = kInproc;
+ }
+ else if (m_Mode == "dump"sv)
+ {
+ Mode = kDump;
+ }
+ else if (m_Mode == "beacon"sv)
+ {
+ Mode = kBeacon;
+ }
+ else if (m_Mode == "buildlog"sv)
+ {
+ Mode = kBuildLog;
+ }
+ else
+ {
+ throw OptionParseException("invalid mode specified!", m_Options.help());
+ }
+
+ // Gather information from recording path
+
+ std::unique_ptr<zen::compute::RecordingReader> Reader;
+ std::unique_ptr<zen::compute::UeRecordingReader> UeReader;
+
+ std::filesystem::path RecordingPath{m_RecordingPath};
+
+ if (!std::filesystem::is_directory(RecordingPath))
+ {
+ throw OptionParseException("replay path should be a directory path!", m_Options.help());
+ }
+ else
+ {
+ if (std::filesystem::is_directory(RecordingPath / "cid"))
+ {
+ Reader = std::make_unique<zen::compute::RecordingReader>(RecordingPath);
+ m_WorkerMap = Reader->ReadWorkers();
+ m_ChunkResolver = Reader.get();
+ m_RecordingReader = Reader.get();
+ }
+ else
+ {
+ UeReader = std::make_unique<zen::compute::UeRecordingReader>(RecordingPath);
+ m_WorkerMap = UeReader->ReadWorkers();
+ m_ChunkResolver = UeReader.get();
+ m_RecordingReader = UeReader.get();
+ }
+ }
+
+ ZEN_CONSOLE("found {} workers, {} action items", m_WorkerMap.size(), m_RecordingReader->GetActionCount());
+
+ for (auto& Kv : m_WorkerMap)
+ {
+ CbObject WorkerDesc = Kv.second.GetObject();
+ const IoHash& WorkerId = Kv.first;
+
+ RegisterWorkerFunctionsFromDescription(WorkerDesc, WorkerId);
+
+ if (m_VerboseLogging)
+ {
+ zen::ExtendableStringBuilder<1024> ObjStr;
+# if 0
+ zen::CompactBinaryToJson(WorkerDesc, ObjStr);
+ ZEN_CONSOLE("worker {}: {}", WorkerId, ObjStr);
+# else
+ zen::CompactBinaryToYaml(WorkerDesc, ObjStr);
+ ZEN_CONSOLE("worker {}:\n{}", WorkerId, ObjStr);
+# endif
+ }
+ }
+
+ if (m_VerboseLogging)
+ {
+ EmitFunctionList(m_FunctionList);
+ }
+
+ // Iterate over work items and dispatch or log them
+
+ int ReturnValue = 0;
+
+ Stopwatch ExecTimer;
+
+ switch (Mode)
+ {
+ case kHttp:
+ // Forward requests to HTTP function service
+ ReturnValue = HttpExecute();
+ break;
+
+ case kDirect:
+ // Not currently supported
+ ReturnValue = LocalMessagingExecute();
+ break;
+
+ case kInproc:
+ // Handle execution in-core (by spawning child processes)
+ ReturnValue = InProcessExecute();
+ break;
+
+ case kDump:
+ // Dump high level information about actions to console
+ ReturnValue = DumpWorkItems();
+ break;
+
+ case kBeacon:
+ ReturnValue = BeaconExecute();
+ break;
+
+ case kBuildLog:
+ ReturnValue = BuildActionsLog();
+ break;
+
+ default:
+ ZEN_ERROR("Unknown operating mode! No work submitted");
+
+ ReturnValue = 1;
+ }
+
+ ZEN_CONSOLE("complete - took {}", NiceTimeSpanMs(ExecTimer.GetElapsedTimeMs()));
+
+ if (!ReturnValue)
+ {
+ ZEN_CONSOLE("all work items completed successfully");
+ }
+ else
+ {
+ ZEN_CONSOLE("some work items failed (code {})", ReturnValue);
+ }
+}
+
+int
+ExecCommand::InProcessExecute()
+{
+ ZEN_ASSERT(m_ChunkResolver);
+ ChunkResolver& Resolver = *m_ChunkResolver;
+
+ zen::compute::FunctionServiceSession FunctionSession(Resolver);
+
+ std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp");
+ FunctionSession.AddLocalRunner(Resolver, TempPath);
+
+ return ExecUsingSession(FunctionSession);
+}
+
+int
+ExecCommand::ExecUsingSession(zen::compute::FunctionServiceSession& FunctionSession)
+{
+ struct JobTracker
+ {
+ public:
+ inline void Insert(int LsnField)
+ {
+ RwLock::ExclusiveLockScope _(Lock);
+ PendingJobs.insert(LsnField);
+ }
+
+ inline bool IsEmpty() const
+ {
+ RwLock::SharedLockScope _(Lock);
+ return PendingJobs.empty();
+ }
+
+ inline void Remove(int CompleteLsn)
+ {
+ RwLock::ExclusiveLockScope _(Lock);
+ PendingJobs.erase(CompleteLsn);
+ }
+
+ inline size_t GetSize() const
+ {
+ RwLock::SharedLockScope _(Lock);
+ return PendingJobs.size();
+ }
+
+ private:
+ mutable RwLock Lock;
+ std::unordered_set<int> PendingJobs;
+ };
+
+ JobTracker PendingJobs;
+
+ std::atomic<int> IsDraining{0};
+
+ auto DrainCompletedJobs = [&] {
+ if (IsDraining.exchange(1))
+ {
+ return;
+ }
+
+ auto _ = MakeGuard([&] { IsDraining.store(0, std::memory_order_release); });
+
+ CbObjectWriter Cbo;
+ FunctionSession.GetCompleted(Cbo);
+
+ if (CbObject Completed = Cbo.Save())
+ {
+ for (auto& It : Completed["completed"sv])
+ {
+ int32_t CompleteLsn = It.AsInt32();
+
+ CbPackage ResultPackage;
+ HttpResponseCode Response = FunctionSession.GetActionResult(CompleteLsn, /* out */ ResultPackage);
+
+ if (Response == HttpResponseCode::OK)
+ {
+ PendingJobs.Remove(CompleteLsn);
+
+ ZEN_CONSOLE("completed: LSN {} ({} still pending)", CompleteLsn, PendingJobs.GetSize());
+ }
+ }
+ }
+ };
+
+ // Describe workers
+
+ ZEN_CONSOLE("describing {} workers", m_WorkerMap.size());
+
+ for (auto Kv : m_WorkerMap)
+ {
+ CbPackage WorkerDesc = Kv.second;
+
+ FunctionSession.RegisterWorker(WorkerDesc);
+ }
+
+ // Then submit work items
+
+ int FailedWorkCounter = 0;
+ size_t RemainingWorkItems = m_RecordingReader->GetActionCount();
+ int SubmittedWorkItems = 0;
+
+ ZEN_CONSOLE("submitting {} work items", RemainingWorkItems);
+
+ int OffsetCounter = m_Offset;
+ int StrideCounter = m_Stride;
+
+ auto ShouldSchedule = [&]() -> bool {
+ if (m_Limit && SubmittedWorkItems >= m_Limit)
+ {
+ // Limit reached, ignore
+
+ return false;
+ }
+
+ if (OffsetCounter && OffsetCounter--)
+ {
+ // Still in offset, ignore
+
+ return false;
+ }
+
+ if (--StrideCounter == 0)
+ {
+ StrideCounter = m_Stride;
+
+ return true;
+ }
+
+ return false;
+ };
+
+ m_RecordingReader->IterateActions(
+ [&](CbObject ActionObject, const IoHash& ActionId) {
+ // Enqueue job
+
+ Stopwatch SubmitTimer;
+
+ const int Priority = 0;
+
+ if (ShouldSchedule())
+ {
+ if (m_VerboseLogging)
+ {
+ int AttachmentCount = 0;
+ uint64_t AttachmentBytes = 0;
+ eastl::hash_set<IoHash> ReferencedChunks;
+
+ ActionObject.IterateAttachments([&](CbFieldView Field) {
+ IoHash AttachData = Field.AsAttachment();
+
+ ReferencedChunks.insert(AttachData);
+ ++AttachmentCount;
+
+ if (IoBuffer ChunkData = m_ChunkResolver->FindChunkByCid(AttachData))
+ {
+ AttachmentBytes += ChunkData.GetSize();
+ }
+ });
+
+ zen::ExtendableStringBuilder<1024> ObjStr;
+ zen::CompactBinaryToJson(ActionObject, ObjStr);
+ ZEN_CONSOLE("work item {} ({} attachments, {} bytes): {}",
+ ActionId,
+ AttachmentCount,
+ NiceBytes(AttachmentBytes),
+ ObjStr);
+ }
+
+ if (zen::compute::FunctionServiceSession::EnqueueResult EnqueueResult =
+ FunctionSession.EnqueueAction(ActionObject, Priority))
+ {
+ const int32_t LsnField = EnqueueResult.Lsn;
+
+ --RemainingWorkItems;
+ ++SubmittedWorkItems;
+
+ if (!m_QuietLogging)
+ {
+ ZEN_CONSOLE("submitted work item #{} - LSN {} - {}. {} remaining",
+ SubmittedWorkItems,
+ LsnField,
+ NiceTimeSpanMs(SubmitTimer.GetElapsedTimeMs()),
+ RemainingWorkItems);
+ }
+
+ PendingJobs.Insert(LsnField);
+ }
+ else
+ {
+ if (!m_QuietLogging)
+ {
+ std::string_view FunctionName = ActionObject["Function"sv].AsString();
+ const Guid FunctionVersion = ActionObject["FunctionVersion"sv].AsUuid();
+ const Guid BuildSystemVersion = ActionObject["BuildSystemVersion"sv].AsUuid();
+
+ ZEN_ERROR(
+ "failed to resolve function for work with (Function:{},FunctionVersion:{},BuildSystemVersion:{}). Work "
+ "descriptor "
+ "at: 'file://{}'",
+ std::string(FunctionName),
+ FunctionVersion,
+ BuildSystemVersion,
+ "<null>");
+
+ EmitFunctionListOnce(m_FunctionList);
+ }
+
+ ++FailedWorkCounter;
+ }
+ }
+
+ // Check for completed work
+
+ DrainCompletedJobs();
+ },
+ 8);
+
+ // Wait until all pending work is complete
+
+ while (!PendingJobs.IsEmpty())
+ {
+ // TODO: improve this logic
+ zen::Sleep(500);
+
+ DrainCompletedJobs();
+ }
+
+ if (FailedWorkCounter)
+ {
+ return 1;
+ }
+
+ return 0;
+}
+
+int
+ExecCommand::LocalMessagingExecute()
+{
+ // Non-HTTP work submission path
+
+ // To be reimplemented using final transport
+
+ return 0;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+int
+ExecCommand::HttpExecute()
+{
+ ZEN_ASSERT(m_ChunkResolver);
+ ChunkResolver& Resolver = *m_ChunkResolver;
+
+ std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp");
+
+ zen::compute::FunctionServiceSession FunctionSession(Resolver);
+ FunctionSession.AddRemoteRunner(Resolver, TempPath, m_HostName);
+
+ return ExecUsingSession(FunctionSession);
+}
+
+int
+ExecCommand::BeaconExecute()
+{
+ ZEN_ASSERT(m_ChunkResolver);
+ ChunkResolver& Resolver = *m_ChunkResolver;
+ std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp");
+
+ zen::compute::FunctionServiceSession FunctionSession(Resolver);
+ FunctionSession.AddRemoteRunner(Resolver, TempPath, "http://localhost:8558");
+ // FunctionSession.AddRemoteRunner(Resolver, TempPath, "http://10.99.9.246:8558");
+
+ return ExecUsingSession(FunctionSession);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+void
+ExecCommand::RegisterWorkerFunctionsFromDescription(const CbObject& WorkerDesc, const IoHash& WorkerId)
+{
+ const Guid WorkerBuildSystemVersion = WorkerDesc["buildsystem_version"sv].AsUuid();
+
+ for (auto& Item : WorkerDesc["functions"sv])
+ {
+ CbObjectView Function = Item.AsObjectView();
+
+ std::string_view FunctionName = Function["name"sv].AsString();
+ const Guid FunctionVersion = Function["version"sv].AsUuid();
+
+ m_FunctionList.emplace_back(FunctionDefinition{.FunctionName = std::string{FunctionName},
+ .FunctionVersion = FunctionVersion,
+ .BuildSystemVersion = WorkerBuildSystemVersion,
+ .WorkerId = WorkerId});
+ }
+}
+
+void
+ExecCommand::EmitFunctionListOnce(const std::vector<FunctionDefinition>& FunctionList)
+{
+ if (m_FunctionListEmittedOnce == false)
+ {
+ EmitFunctionList(FunctionList);
+
+ m_FunctionListEmittedOnce = true;
+ }
+}
+
+int
+ExecCommand::DumpWorkItems()
+{
+ std::atomic<int> EmittedCount{0};
+
+ eastl::hash_map<IoHash, uint64_t> SeenAttachments; // Attachment CID -> count of references
+
+ m_RecordingReader->IterateActions(
+ [&](CbObject ActionObject, const IoHash& ActionId) {
+ eastl::hash_map<IoHash, CompressedBuffer> Attachments;
+
+ uint64_t AttachmentBytes = 0;
+ uint64_t UncompressedAttachmentBytes = 0;
+
+ ActionObject.IterateAttachments([&](const zen::CbFieldView AttachmentField) {
+ const IoHash AttachmentCid = AttachmentField.GetValue().AsHash();
+ IoBuffer AttachmentData = m_ChunkResolver->FindChunkByCid(AttachmentCid);
+ IoHash RawHash;
+ uint64_t RawSize = 0;
+ CompressedBuffer CompressedData = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentData), RawHash, RawSize);
+ Attachments[AttachmentCid] = CompressedData;
+
+ AttachmentBytes += CompressedData.GetCompressedSize();
+ UncompressedAttachmentBytes += CompressedData.DecodeRawSize();
+
+ if (auto [Iter, Inserted] = SeenAttachments.insert({AttachmentCid, 1}); !Inserted)
+ {
+ ++Iter->second;
+ }
+ });
+
+ zen::ExtendableStringBuilder<1024> ObjStr;
+
+# if 0
+ zen::CompactBinaryToJson(ActionObject, ObjStr);
+ ZEN_CONSOLE("work item {} ({} attachments): {}", ActionId, Attachments.size(), ObjStr);
+# else
+ zen::CompactBinaryToYaml(ActionObject, ObjStr);
+ ZEN_CONSOLE("work item {} ({} attachments, {}->{} bytes):\n{}",
+ ActionId,
+ Attachments.size(),
+ AttachmentBytes,
+ UncompressedAttachmentBytes,
+ ObjStr);
+# endif
+
+ ++EmittedCount;
+ },
+ 1);
+
+ ZEN_CONSOLE("emitted: {} actions", EmittedCount.load());
+
+ eastl::map<uint64_t, std::vector<IoHash>> ReferenceHistogram;
+
+ for (const auto& [K, V] : SeenAttachments)
+ {
+ if (V > 1)
+ {
+ ReferenceHistogram[V].push_back(K);
+ }
+ }
+
+ for (const auto& [RefCount, Cids] : ReferenceHistogram)
+ {
+ ZEN_CONSOLE("{} attachments with {} references", Cids.size(), RefCount);
+ }
+
+ return 0;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+int
+ExecCommand::BuildActionsLog()
+{
+ ZEN_ASSERT(m_ChunkResolver);
+ ChunkResolver& Resolver = *m_ChunkResolver;
+
+ if (m_RecordingPath.empty())
+ {
+ throw OptionParseException("need to specify recording path", m_Options.help());
+ }
+
+ if (std::filesystem::exists(m_RecordingLogPath))
+ {
+ throw OptionParseException(fmt::format("recording log directory '{}' already exists!", m_RecordingLogPath), m_Options.help());
+ }
+
+ ZEN_NOT_IMPLEMENTED("build log generation not implemented yet!");
+
+ std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp");
+
+ zen::compute::FunctionServiceSession FunctionSession(Resolver);
+ FunctionSession.StartRecording(Resolver, m_RecordingLogPath);
+
+ return ExecUsingSession(FunctionSession);
+}
+
+void
+ExecCommand::EmitFunctionList(const std::vector<FunctionDefinition>& FunctionList)
+{
+ ZEN_CONSOLE("=== Known functions:\n===========================");
+
+ ZEN_CONSOLE("{:30} {:36} {:36} {}", "function", "version", "build system", "worker id");
+
+ for (const FunctionDefinition& Func : FunctionList)
+ {
+ ZEN_CONSOLE("{:30} {:36} {:36} {}", Func.FunctionName, Func.FunctionVersion, Func.BuildSystemVersion, Func.WorkerId);
+ }
+
+ ZEN_CONSOLE("===========================");
+}
+
+} // namespace zen
+
+#endif // ZEN_WITH_COMPUTE_SERVICES