diff options
| author | zousar <[email protected]> | 2026-02-18 23:19:14 -0700 |
|---|---|---|
| committer | zousar <[email protected]> | 2026-02-18 23:19:14 -0700 |
| commit | 2ba28acaf034722452f82cfb07afc0a4bb90eeab (patch) | |
| tree | c00dea385597180673be6e02aca6c07d9ef6ec00 /src/zen/cmds/exec_cmd.cpp | |
| parent | updatefrontend (diff) | |
| parent | structured compute basics (#714) (diff) | |
| download | archived-zen-2ba28acaf034722452f82cfb07afc0a4bb90eeab.tar.xz archived-zen-2ba28acaf034722452f82cfb07afc0a4bb90eeab.zip | |
Merge branch 'main' into zs/web-ui-improvements
Diffstat (limited to 'src/zen/cmds/exec_cmd.cpp')
| -rw-r--r-- | src/zen/cmds/exec_cmd.cpp | 654 |
1 files changed, 654 insertions, 0 deletions
diff --git a/src/zen/cmds/exec_cmd.cpp b/src/zen/cmds/exec_cmd.cpp new file mode 100644 index 000000000..2d9d0d12e --- /dev/null +++ b/src/zen/cmds/exec_cmd.cpp @@ -0,0 +1,654 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "exec_cmd.h" + +#include <zencompute/functionservice.h> +#include <zencompute/recordingreader.h> +#include <zencore/compactbinary.h> +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinaryfile.h> +#include <zencore/compactbinarypackage.h> +#include <zencore/compactbinaryvalue.h> +#include <zencore/compress.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/scopeguard.h> +#include <zencore/stream.h> +#include <zencore/string.h> +#include <zencore/timer.h> + +#include <EASTL/hash_map.h> +#include <EASTL/hash_set.h> +#include <EASTL/map.h> + +using namespace std::literals; + +namespace eastl { + +template<> +struct hash<zen::IoHash> : public zen::IoHash::Hasher +{ +}; + +} // namespace eastl + +#if ZEN_WITH_COMPUTE_SERVICES + +namespace zen { + +ExecCommand::ExecCommand() +{ + m_Options.add_options()("h,help", "Print help"); + m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName), "<hosturl>"); + m_Options.add_option("", "", "log", "Action log directory", cxxopts::value(m_RecordingLogPath), "<path>"); + m_Options.add_option("", "p", "path", "Recording path (directory or .actionlog file)", cxxopts::value(m_RecordingPath), "<path>"); + m_Options.add_option("", "", "offset", "Recording replay start offset", cxxopts::value(m_Offset), "<offset>"); + m_Options.add_option("", "", "stride", "Recording replay stride", cxxopts::value(m_Stride), "<stride>"); + m_Options.add_option("", "", "limit", "Recording replay limit", cxxopts::value(m_Limit), "<limit>"); + m_Options.add_option("", "", "beacon", "Beacon path", cxxopts::value(m_BeaconPath), "<path>"); + m_Options.add_option("", + "", + "mode", + "Select execution mode (http,inproc,dump,direct,beacon,buildlog)", + cxxopts::value(m_Mode)->default_value("http"), + "<string>"); + m_Options.add_option("", "", "quiet", "Quiet mode (less logging)", cxxopts::value(m_Quiet), "<bool>"); + m_Options.parse_positional("mode"); +} + +ExecCommand::~ExecCommand() +{ +} + +void +ExecCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + // Configure + + if (!ParseOptions(argc, argv)) + { + return; + } + + m_HostName = ResolveTargetHostSpec(m_HostName); + + if (m_RecordingPath.empty()) + { + throw OptionParseException("replay path is required!", m_Options.help()); + } + + m_VerboseLogging = GlobalOptions.IsVerbose; + m_QuietLogging = m_Quiet && !m_VerboseLogging; + + enum ExecMode + { + kHttp, + kDirect, + kInproc, + kDump, + kBeacon, + kBuildLog + } Mode; + + if (m_Mode == "http"sv) + { + Mode = kHttp; + } + else if (m_Mode == "direct"sv) + { + Mode = kDirect; + } + else if (m_Mode == "inproc"sv) + { + Mode = kInproc; + } + else if (m_Mode == "dump"sv) + { + Mode = kDump; + } + else if (m_Mode == "beacon"sv) + { + Mode = kBeacon; + } + else if (m_Mode == "buildlog"sv) + { + Mode = kBuildLog; + } + else + { + throw OptionParseException("invalid mode specified!", m_Options.help()); + } + + // Gather information from recording path + + std::unique_ptr<zen::compute::RecordingReader> Reader; + std::unique_ptr<zen::compute::UeRecordingReader> UeReader; + + std::filesystem::path RecordingPath{m_RecordingPath}; + + if (!std::filesystem::is_directory(RecordingPath)) + { + throw OptionParseException("replay path should be a directory path!", m_Options.help()); + } + else + { + if (std::filesystem::is_directory(RecordingPath / "cid")) + { + Reader = std::make_unique<zen::compute::RecordingReader>(RecordingPath); + m_WorkerMap = Reader->ReadWorkers(); + m_ChunkResolver = Reader.get(); + m_RecordingReader = Reader.get(); + } + else + { + UeReader = std::make_unique<zen::compute::UeRecordingReader>(RecordingPath); + m_WorkerMap = UeReader->ReadWorkers(); + m_ChunkResolver = UeReader.get(); + m_RecordingReader = UeReader.get(); + } + } + + ZEN_CONSOLE("found {} workers, {} action items", m_WorkerMap.size(), m_RecordingReader->GetActionCount()); + + for (auto& Kv : m_WorkerMap) + { + CbObject WorkerDesc = Kv.second.GetObject(); + const IoHash& WorkerId = Kv.first; + + RegisterWorkerFunctionsFromDescription(WorkerDesc, WorkerId); + + if (m_VerboseLogging) + { + zen::ExtendableStringBuilder<1024> ObjStr; +# if 0 + zen::CompactBinaryToJson(WorkerDesc, ObjStr); + ZEN_CONSOLE("worker {}: {}", WorkerId, ObjStr); +# else + zen::CompactBinaryToYaml(WorkerDesc, ObjStr); + ZEN_CONSOLE("worker {}:\n{}", WorkerId, ObjStr); +# endif + } + } + + if (m_VerboseLogging) + { + EmitFunctionList(m_FunctionList); + } + + // Iterate over work items and dispatch or log them + + int ReturnValue = 0; + + Stopwatch ExecTimer; + + switch (Mode) + { + case kHttp: + // Forward requests to HTTP function service + ReturnValue = HttpExecute(); + break; + + case kDirect: + // Not currently supported + ReturnValue = LocalMessagingExecute(); + break; + + case kInproc: + // Handle execution in-core (by spawning child processes) + ReturnValue = InProcessExecute(); + break; + + case kDump: + // Dump high level information about actions to console + ReturnValue = DumpWorkItems(); + break; + + case kBeacon: + ReturnValue = BeaconExecute(); + break; + + case kBuildLog: + ReturnValue = BuildActionsLog(); + break; + + default: + ZEN_ERROR("Unknown operating mode! No work submitted"); + + ReturnValue = 1; + } + + ZEN_CONSOLE("complete - took {}", NiceTimeSpanMs(ExecTimer.GetElapsedTimeMs())); + + if (!ReturnValue) + { + ZEN_CONSOLE("all work items completed successfully"); + } + else + { + ZEN_CONSOLE("some work items failed (code {})", ReturnValue); + } +} + +int +ExecCommand::InProcessExecute() +{ + ZEN_ASSERT(m_ChunkResolver); + ChunkResolver& Resolver = *m_ChunkResolver; + + zen::compute::FunctionServiceSession FunctionSession(Resolver); + + std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp"); + FunctionSession.AddLocalRunner(Resolver, TempPath); + + return ExecUsingSession(FunctionSession); +} + +int +ExecCommand::ExecUsingSession(zen::compute::FunctionServiceSession& FunctionSession) +{ + struct JobTracker + { + public: + inline void Insert(int LsnField) + { + RwLock::ExclusiveLockScope _(Lock); + PendingJobs.insert(LsnField); + } + + inline bool IsEmpty() const + { + RwLock::SharedLockScope _(Lock); + return PendingJobs.empty(); + } + + inline void Remove(int CompleteLsn) + { + RwLock::ExclusiveLockScope _(Lock); + PendingJobs.erase(CompleteLsn); + } + + inline size_t GetSize() const + { + RwLock::SharedLockScope _(Lock); + return PendingJobs.size(); + } + + private: + mutable RwLock Lock; + std::unordered_set<int> PendingJobs; + }; + + JobTracker PendingJobs; + + std::atomic<int> IsDraining{0}; + + auto DrainCompletedJobs = [&] { + if (IsDraining.exchange(1)) + { + return; + } + + auto _ = MakeGuard([&] { IsDraining.store(0, std::memory_order_release); }); + + CbObjectWriter Cbo; + FunctionSession.GetCompleted(Cbo); + + if (CbObject Completed = Cbo.Save()) + { + for (auto& It : Completed["completed"sv]) + { + int32_t CompleteLsn = It.AsInt32(); + + CbPackage ResultPackage; + HttpResponseCode Response = FunctionSession.GetActionResult(CompleteLsn, /* out */ ResultPackage); + + if (Response == HttpResponseCode::OK) + { + PendingJobs.Remove(CompleteLsn); + + ZEN_CONSOLE("completed: LSN {} ({} still pending)", CompleteLsn, PendingJobs.GetSize()); + } + } + } + }; + + // Describe workers + + ZEN_CONSOLE("describing {} workers", m_WorkerMap.size()); + + for (auto Kv : m_WorkerMap) + { + CbPackage WorkerDesc = Kv.second; + + FunctionSession.RegisterWorker(WorkerDesc); + } + + // Then submit work items + + int FailedWorkCounter = 0; + size_t RemainingWorkItems = m_RecordingReader->GetActionCount(); + int SubmittedWorkItems = 0; + + ZEN_CONSOLE("submitting {} work items", RemainingWorkItems); + + int OffsetCounter = m_Offset; + int StrideCounter = m_Stride; + + auto ShouldSchedule = [&]() -> bool { + if (m_Limit && SubmittedWorkItems >= m_Limit) + { + // Limit reached, ignore + + return false; + } + + if (OffsetCounter && OffsetCounter--) + { + // Still in offset, ignore + + return false; + } + + if (--StrideCounter == 0) + { + StrideCounter = m_Stride; + + return true; + } + + return false; + }; + + m_RecordingReader->IterateActions( + [&](CbObject ActionObject, const IoHash& ActionId) { + // Enqueue job + + Stopwatch SubmitTimer; + + const int Priority = 0; + + if (ShouldSchedule()) + { + if (m_VerboseLogging) + { + int AttachmentCount = 0; + uint64_t AttachmentBytes = 0; + eastl::hash_set<IoHash> ReferencedChunks; + + ActionObject.IterateAttachments([&](CbFieldView Field) { + IoHash AttachData = Field.AsAttachment(); + + ReferencedChunks.insert(AttachData); + ++AttachmentCount; + + if (IoBuffer ChunkData = m_ChunkResolver->FindChunkByCid(AttachData)) + { + AttachmentBytes += ChunkData.GetSize(); + } + }); + + zen::ExtendableStringBuilder<1024> ObjStr; + zen::CompactBinaryToJson(ActionObject, ObjStr); + ZEN_CONSOLE("work item {} ({} attachments, {} bytes): {}", + ActionId, + AttachmentCount, + NiceBytes(AttachmentBytes), + ObjStr); + } + + if (zen::compute::FunctionServiceSession::EnqueueResult EnqueueResult = + FunctionSession.EnqueueAction(ActionObject, Priority)) + { + const int32_t LsnField = EnqueueResult.Lsn; + + --RemainingWorkItems; + ++SubmittedWorkItems; + + if (!m_QuietLogging) + { + ZEN_CONSOLE("submitted work item #{} - LSN {} - {}. {} remaining", + SubmittedWorkItems, + LsnField, + NiceTimeSpanMs(SubmitTimer.GetElapsedTimeMs()), + RemainingWorkItems); + } + + PendingJobs.Insert(LsnField); + } + else + { + if (!m_QuietLogging) + { + std::string_view FunctionName = ActionObject["Function"sv].AsString(); + const Guid FunctionVersion = ActionObject["FunctionVersion"sv].AsUuid(); + const Guid BuildSystemVersion = ActionObject["BuildSystemVersion"sv].AsUuid(); + + ZEN_ERROR( + "failed to resolve function for work with (Function:{},FunctionVersion:{},BuildSystemVersion:{}). Work " + "descriptor " + "at: 'file://{}'", + std::string(FunctionName), + FunctionVersion, + BuildSystemVersion, + "<null>"); + + EmitFunctionListOnce(m_FunctionList); + } + + ++FailedWorkCounter; + } + } + + // Check for completed work + + DrainCompletedJobs(); + }, + 8); + + // Wait until all pending work is complete + + while (!PendingJobs.IsEmpty()) + { + // TODO: improve this logic + zen::Sleep(500); + + DrainCompletedJobs(); + } + + if (FailedWorkCounter) + { + return 1; + } + + return 0; +} + +int +ExecCommand::LocalMessagingExecute() +{ + // Non-HTTP work submission path + + // To be reimplemented using final transport + + return 0; +} + +////////////////////////////////////////////////////////////////////////// + +int +ExecCommand::HttpExecute() +{ + ZEN_ASSERT(m_ChunkResolver); + ChunkResolver& Resolver = *m_ChunkResolver; + + std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp"); + + zen::compute::FunctionServiceSession FunctionSession(Resolver); + FunctionSession.AddRemoteRunner(Resolver, TempPath, m_HostName); + + return ExecUsingSession(FunctionSession); +} + +int +ExecCommand::BeaconExecute() +{ + ZEN_ASSERT(m_ChunkResolver); + ChunkResolver& Resolver = *m_ChunkResolver; + std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp"); + + zen::compute::FunctionServiceSession FunctionSession(Resolver); + FunctionSession.AddRemoteRunner(Resolver, TempPath, "http://localhost:8558"); + // FunctionSession.AddRemoteRunner(Resolver, TempPath, "http://10.99.9.246:8558"); + + return ExecUsingSession(FunctionSession); +} + +////////////////////////////////////////////////////////////////////////// + +void +ExecCommand::RegisterWorkerFunctionsFromDescription(const CbObject& WorkerDesc, const IoHash& WorkerId) +{ + const Guid WorkerBuildSystemVersion = WorkerDesc["buildsystem_version"sv].AsUuid(); + + for (auto& Item : WorkerDesc["functions"sv]) + { + CbObjectView Function = Item.AsObjectView(); + + std::string_view FunctionName = Function["name"sv].AsString(); + const Guid FunctionVersion = Function["version"sv].AsUuid(); + + m_FunctionList.emplace_back(FunctionDefinition{.FunctionName = std::string{FunctionName}, + .FunctionVersion = FunctionVersion, + .BuildSystemVersion = WorkerBuildSystemVersion, + .WorkerId = WorkerId}); + } +} + +void +ExecCommand::EmitFunctionListOnce(const std::vector<FunctionDefinition>& FunctionList) +{ + if (m_FunctionListEmittedOnce == false) + { + EmitFunctionList(FunctionList); + + m_FunctionListEmittedOnce = true; + } +} + +int +ExecCommand::DumpWorkItems() +{ + std::atomic<int> EmittedCount{0}; + + eastl::hash_map<IoHash, uint64_t> SeenAttachments; // Attachment CID -> count of references + + m_RecordingReader->IterateActions( + [&](CbObject ActionObject, const IoHash& ActionId) { + eastl::hash_map<IoHash, CompressedBuffer> Attachments; + + uint64_t AttachmentBytes = 0; + uint64_t UncompressedAttachmentBytes = 0; + + ActionObject.IterateAttachments([&](const zen::CbFieldView AttachmentField) { + const IoHash AttachmentCid = AttachmentField.GetValue().AsHash(); + IoBuffer AttachmentData = m_ChunkResolver->FindChunkByCid(AttachmentCid); + IoHash RawHash; + uint64_t RawSize = 0; + CompressedBuffer CompressedData = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentData), RawHash, RawSize); + Attachments[AttachmentCid] = CompressedData; + + AttachmentBytes += CompressedData.GetCompressedSize(); + UncompressedAttachmentBytes += CompressedData.DecodeRawSize(); + + if (auto [Iter, Inserted] = SeenAttachments.insert({AttachmentCid, 1}); !Inserted) + { + ++Iter->second; + } + }); + + zen::ExtendableStringBuilder<1024> ObjStr; + +# if 0 + zen::CompactBinaryToJson(ActionObject, ObjStr); + ZEN_CONSOLE("work item {} ({} attachments): {}", ActionId, Attachments.size(), ObjStr); +# else + zen::CompactBinaryToYaml(ActionObject, ObjStr); + ZEN_CONSOLE("work item {} ({} attachments, {}->{} bytes):\n{}", + ActionId, + Attachments.size(), + AttachmentBytes, + UncompressedAttachmentBytes, + ObjStr); +# endif + + ++EmittedCount; + }, + 1); + + ZEN_CONSOLE("emitted: {} actions", EmittedCount.load()); + + eastl::map<uint64_t, std::vector<IoHash>> ReferenceHistogram; + + for (const auto& [K, V] : SeenAttachments) + { + if (V > 1) + { + ReferenceHistogram[V].push_back(K); + } + } + + for (const auto& [RefCount, Cids] : ReferenceHistogram) + { + ZEN_CONSOLE("{} attachments with {} references", Cids.size(), RefCount); + } + + return 0; +} + +////////////////////////////////////////////////////////////////////////// + +int +ExecCommand::BuildActionsLog() +{ + ZEN_ASSERT(m_ChunkResolver); + ChunkResolver& Resolver = *m_ChunkResolver; + + if (m_RecordingPath.empty()) + { + throw OptionParseException("need to specify recording path", m_Options.help()); + } + + if (std::filesystem::exists(m_RecordingLogPath)) + { + throw OptionParseException(fmt::format("recording log directory '{}' already exists!", m_RecordingLogPath), m_Options.help()); + } + + ZEN_NOT_IMPLEMENTED("build log generation not implemented yet!"); + + std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp"); + + zen::compute::FunctionServiceSession FunctionSession(Resolver); + FunctionSession.StartRecording(Resolver, m_RecordingLogPath); + + return ExecUsingSession(FunctionSession); +} + +void +ExecCommand::EmitFunctionList(const std::vector<FunctionDefinition>& FunctionList) +{ + ZEN_CONSOLE("=== Known functions:\n==========================="); + + ZEN_CONSOLE("{:30} {:36} {:36} {}", "function", "version", "build system", "worker id"); + + for (const FunctionDefinition& Func : FunctionList) + { + ZEN_CONSOLE("{:30} {:36} {:36} {}", Func.FunctionName, Func.FunctionVersion, Func.BuildSystemVersion, Func.WorkerId); + } + + ZEN_CONSOLE("==========================="); +} + +} // namespace zen + +#endif // ZEN_WITH_COMPUTE_SERVICES |