// Copyright Epic Games, Inc. All Rights Reserved. #include "exec_cmd.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "consoleprogress.h" #include #include #include using namespace std::literals; namespace eastl { template<> struct hash : public zen::IoHash::Hasher { }; } // namespace eastl #if ZEN_WITH_COMPUTE_SERVICES namespace zen { namespace { static std::string EscapeHtml(std::string_view Input) { std::string Out; Out.reserve(Input.size()); for (char C : Input) { switch (C) { case '&': Out += "&"; break; case '<': Out += "<"; break; case '>': Out += ">"; break; case '"': Out += """; break; case '\'': Out += "'"; break; default: Out += C; } } return Out; } static std::string EscapeJson(std::string_view Input) { std::string Out; Out.reserve(Input.size()); for (char C : Input) { switch (C) { case '"': Out += "\\\""; break; case '\\': Out += "\\\\"; break; case '\n': Out += "\\n"; break; case '\r': Out += "\\r"; break; case '\t': Out += "\\t"; break; default: if (static_cast(C) < 0x20) { Out += fmt::format("\\u{:04x}", static_cast(static_cast(C))); } else { Out += C; } } } return Out; } } // namespace ////////////////////////////////////////////////////////////////////////// // ExecSessionConfig - read-only configuration for a session run struct ExecSessionConfig { zen::ChunkResolver& Resolver; zen::compute::RecordingReaderBase& RecordingReader; const std::unordered_map& WorkerMap; std::vector& FunctionList; // mutable for EmitFunctionListOnce std::string_view OrchestratorUrl; const std::filesystem::path& OutputPath; int Offset = 0; int Stride = 1; int Limit = 0; bool Verbose = false; bool Quiet = false; bool DumpActions = false; bool Binary = false; ConsoleProgressMode ProgressMode = ConsoleProgressMode::Pretty; }; ////////////////////////////////////////////////////////////////////////// // ExecSessionRunner - owns per-run state, drives the session lifecycle class ExecSessionRunner { public: ExecSessionRunner(zen::compute::ComputeServiceSession& Session, const ExecSessionConfig& Config); int Run(); private: // Types struct JobTracker { public: void Insert(int LsnField); bool IsEmpty() const; void Remove(int CompleteLsn); size_t GetSize() const; private: mutable RwLock Lock; std::unordered_set PendingJobs; }; struct ActionSummaryEntry { int32_t Lsn = 0; int RecordingIndex = 0; IoHash ActionId; std::string FunctionName; int InputAttachments = 0; uint64_t InputBytes = 0; int OutputAttachments = 0; uint64_t OutputBytes = 0; float WallSeconds = 0.0f; float CpuSeconds = 0.0f; uint64_t SubmittedTicks = 0; uint64_t StartedTicks = 0; std::string ExecutionLocation; }; // Methods std::string RegisterOrchestratorClient(); void SendOrchestratorHeartbeat(); void CompleteOrchestratorClient(); void DrainCompletedJobs(); void SaveResultOutput(int32_t CompleteLsn, CbPackage& ResultPackage); void SaveActionOutput(int32_t Lsn, int RecordingIndex, const IoHash& ActionId, const CbObject& ActionObject); void WriteSummaryFiles(); void EmitFunctionListOnce(); // State zen::compute::ComputeServiceSession& m_Session; ExecSessionConfig m_Config; JobTracker m_PendingJobs; std::mutex m_SummaryLock; std::unordered_map m_SummaryEntries; int m_QueueId = 0; std::string m_OrchestratorClientId; Stopwatch m_OrchestratorHeartbeatTimer; bool m_FunctionListEmittedOnce = false; std::atomic m_IsDraining{0}; }; ////////////////////////////////////////////////////////////////////////// // ExecSessionRunner::JobTracker void ExecSessionRunner::JobTracker::Insert(int LsnField) { RwLock::ExclusiveLockScope _(Lock); PendingJobs.insert(LsnField); } bool ExecSessionRunner::JobTracker::IsEmpty() const { RwLock::SharedLockScope _(Lock); return PendingJobs.empty(); } void ExecSessionRunner::JobTracker::Remove(int CompleteLsn) { RwLock::ExclusiveLockScope _(Lock); PendingJobs.erase(CompleteLsn); } size_t ExecSessionRunner::JobTracker::GetSize() const { RwLock::SharedLockScope _(Lock); return PendingJobs.size(); } ////////////////////////////////////////////////////////////////////////// // ExecSessionRunner implementation ExecSessionRunner::ExecSessionRunner(zen::compute::ComputeServiceSession& Session, const ExecSessionConfig& Config) : m_Session(Session) , m_Config(Config) { } std::string ExecSessionRunner::RegisterOrchestratorClient() { if (m_Config.OrchestratorUrl.empty()) { return {}; } try { HttpClient OrchestratorClient(m_Config.OrchestratorUrl); CbObjectWriter Ann; Ann << "session_id"sv << GetSessionId(); Ann << "hostname"sv << std::string_view(GetMachineName()); CbObjectWriter Meta; Meta << "source"sv << "zen-exec"sv; Ann << "metadata"sv << Meta.Save(); auto Resp = OrchestratorClient.Post("/orch/clients", Ann.Save()); if (Resp.IsSuccess()) { std::string ClientId = std::string(Resp.AsObject()["id"].AsString()); ZEN_CONSOLE_INFO("registered with orchestrator as {}", ClientId); return ClientId; } else { ZEN_WARN("failed to register with orchestrator (status {})", static_cast(Resp.StatusCode)); } } catch (const std::exception& Ex) { ZEN_WARN("failed to register with orchestrator: {}", Ex.what()); } return {}; } void ExecSessionRunner::SendOrchestratorHeartbeat() { if (m_OrchestratorClientId.empty() || m_OrchestratorHeartbeatTimer.GetElapsedTimeMs() < 30'000) { return; } m_OrchestratorHeartbeatTimer.Reset(); try { HttpClient OrchestratorClient(m_Config.OrchestratorUrl); std::ignore = OrchestratorClient.Post(fmt::format("/orch/clients/{}/update", m_OrchestratorClientId)); } catch (...) { } } void ExecSessionRunner::CompleteOrchestratorClient() { if (!m_OrchestratorClientId.empty()) { try { HttpClient OrchestratorClient(m_Config.OrchestratorUrl); std::ignore = OrchestratorClient.Post(fmt::format("/orch/clients/{}/complete", m_OrchestratorClientId)); } catch (...) { } } } void ExecSessionRunner::DrainCompletedJobs() { if (m_IsDraining.exchange(1)) { return; } auto _ = MakeGuard([&] { m_IsDraining.store(0, std::memory_order_release); }); CbObjectWriter Cbo; m_Session.GetQueueCompleted(m_QueueId, Cbo); if (CbObject Completed = Cbo.Save()) { for (auto& It : Completed["completed"sv]) { int32_t CompleteLsn = It.AsInt32(); CbPackage ResultPackage; HttpResponseCode Response = m_Session.GetActionResult(CompleteLsn, /* out */ ResultPackage); if (Response == HttpResponseCode::OK) { if (!m_Config.OutputPath.empty() && ResultPackage) { SaveResultOutput(CompleteLsn, ResultPackage); } m_PendingJobs.Remove(CompleteLsn); } } } } void ExecSessionRunner::SaveResultOutput(int32_t CompleteLsn, CbPackage& ResultPackage) { int OutputAttachments = 0; uint64_t OutputBytes = 0; if (!m_Config.Binary) { // Write the root object as YAML ExtendableStringBuilder<4096> YamlStr; CompactBinaryToYaml(ResultPackage.GetObject(), YamlStr); std::string_view Yaml = YamlStr; zen::WriteFile(m_Config.OutputPath / fmt::format("{}.result.yaml", CompleteLsn), IoBuffer(IoBuffer::Clone, Yaml.data(), Yaml.size())); // Write decompressed attachments auto Attachments = ResultPackage.GetAttachments(); if (!Attachments.empty()) { std::filesystem::path AttDir = m_Config.OutputPath / fmt::format("{}.result.attachments", CompleteLsn); zen::CreateDirectories(AttDir); for (const CbAttachment& Att : Attachments) { ++OutputAttachments; IoHash AttHash = Att.GetHash(); if (Att.IsCompressedBinary()) { SharedBuffer Decompressed = Att.AsCompressedBinary().Decompress(); OutputBytes += Decompressed.GetSize(); zen::WriteFile(AttDir / AttHash.ToHexString(), IoBuffer(IoBuffer::Clone, Decompressed.GetData(), Decompressed.GetSize())); } else { SharedBuffer Binary = Att.AsBinary(); OutputBytes += Binary.GetSize(); zen::WriteFile(AttDir / AttHash.ToHexString(), IoBuffer(IoBuffer::Clone, Binary.GetData(), Binary.GetSize())); } } } if (!m_Config.Quiet) { ZEN_CONSOLE("saved result: {}/{}.result.yaml ({} attachments)", m_Config.OutputPath.string(), CompleteLsn, OutputAttachments); } } else { CompositeBuffer Serialized = FormatPackageMessageBuffer(ResultPackage); zen::WriteFile(m_Config.OutputPath / fmt::format("{}.result.pkg", CompleteLsn), std::move(Serialized)); for (const CbAttachment& Att : ResultPackage.GetAttachments()) { ++OutputAttachments; OutputBytes += Att.AsBinary().GetSize(); } if (!m_Config.Quiet) { ZEN_CONSOLE("saved result: {}/{}.result.pkg", m_Config.OutputPath.string(), CompleteLsn); } } std::lock_guard Lock(m_SummaryLock); if (auto It2 = m_SummaryEntries.find(CompleteLsn); It2 != m_SummaryEntries.end()) { It2->second.OutputAttachments = OutputAttachments; It2->second.OutputBytes = OutputBytes; } } void ExecSessionRunner::SaveActionOutput(int32_t Lsn, int RecordingIndex, const IoHash& ActionId, const CbObject& ActionObject) { ActionSummaryEntry Entry; Entry.Lsn = Lsn; Entry.RecordingIndex = RecordingIndex; Entry.ActionId = ActionId; Entry.FunctionName = std::string(ActionObject["Function"sv].AsString()); if (!m_Config.Binary) { // Write action object as YAML ExtendableStringBuilder<4096> YamlStr; CompactBinaryToYaml(ActionObject, YamlStr); std::string_view Yaml = YamlStr; zen::WriteFile(m_Config.OutputPath / fmt::format("{}.action.yaml", Lsn), IoBuffer(IoBuffer::Clone, Yaml.data(), Yaml.size())); // Write decompressed input attachments std::filesystem::path AttDir = m_Config.OutputPath / fmt::format("{}.action.attachments", Lsn); bool AttDirCreated = false; ActionObject.IterateAttachments([&](CbFieldView Field) { IoHash AttachCid = Field.AsAttachment(); ++Entry.InputAttachments; if (IoBuffer ChunkData = m_Config.Resolver.FindChunkByCid(AttachCid)) { IoHash RawHash; uint64_t RawSize = 0; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(ChunkData), RawHash, RawSize); SharedBuffer Decompressed = Compressed.Decompress(); Entry.InputBytes += Decompressed.GetSize(); if (!AttDirCreated) { zen::CreateDirectories(AttDir); AttDirCreated = true; } zen::WriteFile(AttDir / AttachCid.ToHexString(), IoBuffer(IoBuffer::Clone, Decompressed.GetData(), Decompressed.GetSize())); } }); if (!m_Config.Quiet) { ZEN_CONSOLE("saved action: {}/{}.action.yaml ({} attachments)", m_Config.OutputPath.string(), Lsn, Entry.InputAttachments); } } else { // Build a CbPackage from the action and write as .pkg CbPackage ActionPackage; ActionPackage.SetObject(ActionObject); ActionObject.IterateAttachments([&](CbFieldView Field) { IoHash AttachCid = Field.AsAttachment(); ++Entry.InputAttachments; if (IoBuffer ChunkData = m_Config.Resolver.FindChunkByCid(AttachCid)) { IoHash RawHash; uint64_t RawSize = 0; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(ChunkData), RawHash, RawSize); Entry.InputBytes += ChunkData.GetSize(); ActionPackage.AddAttachment(CbAttachment(std::move(Compressed), RawHash)); } }); CompositeBuffer Serialized = FormatPackageMessageBuffer(ActionPackage); zen::WriteFile(m_Config.OutputPath / fmt::format("{}.action.pkg", Lsn), std::move(Serialized)); if (!m_Config.Quiet) { ZEN_CONSOLE("saved action: {}/{}.action.pkg", m_Config.OutputPath.string(), Lsn); } } std::lock_guard Lock(m_SummaryLock); m_SummaryEntries.emplace(Lsn, std::move(Entry)); } void ExecSessionRunner::WriteSummaryFiles() { if (m_Config.OutputPath.empty() || m_SummaryEntries.empty()) { return; } // Merge timing data from queue history into summary entries // RunnerAction::State indices (can't include functionrunner.h from here) constexpr int kStateNew = 0; constexpr int kStatePending = 1; constexpr int kStateRunning = 3; constexpr int kStateCompleted = 4; // first terminal state constexpr int kStateCount = 8; for (const auto& HistEntry : m_Session.GetQueueHistory(m_QueueId, 0)) { std::lock_guard Lock(m_SummaryLock); if (auto It = m_SummaryEntries.find(HistEntry.Lsn); It != m_SummaryEntries.end()) { // Find terminal state timestamp (Completed, Failed, Abandoned, or Cancelled) uint64_t EndTick = 0; for (int S = kStateCompleted; S < kStateCount; ++S) { if (HistEntry.Timestamps[S] != 0) { EndTick = HistEntry.Timestamps[S]; break; } } uint64_t StartTick = HistEntry.Timestamps[kStateNew]; if (EndTick > StartTick) { It->second.WallSeconds = float(double(EndTick - StartTick) / double(TimeSpan::TicksPerSecond)); } It->second.CpuSeconds = HistEntry.CpuSeconds; It->second.SubmittedTicks = HistEntry.Timestamps[kStatePending]; It->second.StartedTicks = HistEntry.Timestamps[kStateRunning]; It->second.ExecutionLocation = HistEntry.ExecutionLocation; } } // Sort entries by recording index std::vector Sorted; Sorted.reserve(m_SummaryEntries.size()); for (auto& [_, Entry] : m_SummaryEntries) { Sorted.push_back(std::move(Entry)); } std::sort(Sorted.begin(), Sorted.end(), [](const ActionSummaryEntry& A, const ActionSummaryEntry& B) { return A.RecordingIndex < B.RecordingIndex; }); auto FormatTimestamp = [](uint64_t Ticks) -> std::string { if (Ticks == 0) { return "-"; } return DateTime(Ticks).ToString("%H:%M:%S.%s"); }; // Write summary.txt ExtendableStringBuilder<4096> Summary; Summary.Append(fmt::format("{:<8} {:<8} {:<40} {:<40} {:>8} {:>12} {:>8} {:>12} {:>8} {:>8} {:>12} {:>12} {:<24}\n", "LSN", "Index", "ActionId", "Function", "InAtt", "InBytes", "OutAtt", "OutBytes", "Wall(s)", "CPU(s)", "Submitted", "Started", "Location")); Summary.Append(fmt::format("{:-<8} {:-<8} {:-<40} {:-<40} {:-<8} {:-<12} {:-<8} {:-<12} {:-<8} {:-<8} {:-<12} {:-<12} {:-<24}\n", "", "", "", "", "", "", "", "", "", "", "", "", "")); for (const ActionSummaryEntry& Entry : Sorted) { Summary.Append(fmt::format("{:<8} {:<8} {:<40} {:<40} {:>8} {:>12} {:>8} {:>12} {:>8.2f} {:>8.2f} {:>12} {:>12} {:<24}\n", Entry.Lsn, Entry.RecordingIndex, Entry.ActionId, Entry.FunctionName, Entry.InputAttachments, NiceBytes(Entry.InputBytes), Entry.OutputAttachments, NiceBytes(Entry.OutputBytes), Entry.WallSeconds, Entry.CpuSeconds, FormatTimestamp(Entry.SubmittedTicks), FormatTimestamp(Entry.StartedTicks), Entry.ExecutionLocation)); } std::filesystem::path SummaryPath = m_Config.OutputPath / "summary.txt"; std::string_view SummaryStr = Summary; zen::WriteFile(SummaryPath, IoBuffer(IoBuffer::Clone, SummaryStr.data(), SummaryStr.size())); ZEN_CONSOLE("wrote summary to {}", SummaryPath.string()); // Write summary.html if (!m_Config.Binary) { ExtendableStringBuilder<8192> Html; Html.Append(std::string_view(R"( Exec Summary

Exec Summary

LSN Index Action ID Function In Attachments In Bytes Out Attachments Out Bytes Wall(s) CPU(s) Submitted Started Location
)JS")); std::filesystem::path HtmlPath = m_Config.OutputPath / "summary.html"; std::string_view HtmlStr = Html; zen::WriteFile(HtmlPath, IoBuffer(IoBuffer::Clone, HtmlStr.data(), HtmlStr.size())); ZEN_CONSOLE("wrote HTML summary to {}", HtmlPath.string()); } } void ExecSessionRunner::EmitFunctionListOnce() { if (!m_FunctionListEmittedOnce) { ExecCommand::EmitFunctionList(m_Config.FunctionList); m_FunctionListEmittedOnce = true; } } int ExecSessionRunner::Run() { m_Session.WaitUntilReady(); // Register as a client with the orchestrator (best-effort) m_OrchestratorClientId = RegisterOrchestratorClient(); auto ClientCleanup = MakeGuard([&] { CompleteOrchestratorClient(); }); // Create a queue to group all actions from this exec session CbObjectWriter Metadata; Metadata << "source"sv << "zen-exec"sv; auto QueueResult = m_Session.CreateQueue("zen-exec", Metadata.Save()); const int QueueId = QueueResult.QueueId; if (!QueueId) { ZEN_ERROR("failed to create compute queue"); return 1; } m_QueueId = QueueId; auto QueueCleanup = MakeGuard([&] { m_Session.DeleteQueue(QueueId); }); if (!m_Config.OutputPath.empty()) { zen::CreateDirectories(m_Config.OutputPath); } // Describe workers ZEN_CONSOLE("describing {} workers", m_Config.WorkerMap.size()); for (auto Kv : m_Config.WorkerMap) { CbPackage WorkerDesc = Kv.second; m_Session.RegisterWorker(WorkerDesc); } // Then submit work items std::atomic FailedWorkCounter{0}; std::atomic RemainingWorkItems{m_Config.RecordingReader.GetActionCount()}; std::atomic SubmittedWorkItems{0}; size_t TotalWorkItems = RemainingWorkItems.load(); std::unique_ptr ProgressOwner(CreateConsoleProgress(m_Config.ProgressMode)); std::unique_ptr SubmitProgress = ProgressOwner->CreateProgressBar("Submit"); SubmitProgress->UpdateState( {.Task = "Submitting work items", .TotalCount = TotalWorkItems, .RemainingCount = RemainingWorkItems.load()}, false); int OffsetCounter = m_Config.Offset; int StrideCounter = m_Config.Stride; auto ShouldSchedule = [&]() -> bool { if (m_Config.Limit && SubmittedWorkItems.load() >= m_Config.Limit) { // Limit reached, ignore return false; } if (OffsetCounter && OffsetCounter--) { // Still in offset, ignore return false; } if (--StrideCounter == 0) { StrideCounter = m_Config.Stride; return true; } return false; }; int TargetParallelism = 8; if (OffsetCounter || StrideCounter || m_Config.Limit) { TargetParallelism = 1; } std::atomic RecordingIndex{0}; m_Config.RecordingReader.IterateActions( [&](CbObject ActionObject, const IoHash& ActionId) { // Enqueue job const int CurrentRecordingIndex = RecordingIndex++; Stopwatch SubmitTimer; const int Priority = 0; if (ShouldSchedule()) { if (m_Config.Verbose) { int AttachmentCount = 0; uint64_t AttachmentBytes = 0; eastl::hash_set ReferencedChunks; ActionObject.IterateAttachments([&](CbFieldView Field) { IoHash AttachData = Field.AsAttachment(); ReferencedChunks.insert(AttachData); ++AttachmentCount; if (IoBuffer ChunkData = m_Config.Resolver.FindChunkByCid(AttachData)) { AttachmentBytes += ChunkData.GetSize(); } }); zen::ExtendableStringBuilder<1024> ObjStr; zen::CompactBinaryToJson(ActionObject, ObjStr); ZEN_CONSOLE("work item {} ({} attachments, {} bytes): {}", ActionId, AttachmentCount, NiceBytes(AttachmentBytes), ObjStr); } if (m_Config.DumpActions) { int AttachmentCount = 0; uint64_t AttachmentBytes = 0; ActionObject.IterateAttachments([&](CbFieldView Field) { IoHash AttachData = Field.AsAttachment(); ++AttachmentCount; if (IoBuffer ChunkData = m_Config.Resolver.FindChunkByCid(AttachData)) { AttachmentBytes += ChunkData.GetSize(); } }); zen::ExtendableStringBuilder<1024> ObjStr; zen::CompactBinaryToYaml(ActionObject, ObjStr); ZEN_CONSOLE("action {} ({} attachments, {}):\n{}", ActionId, AttachmentCount, NiceBytes(AttachmentBytes), ObjStr); } if (zen::compute::ComputeServiceSession::EnqueueResult EnqueueResult = m_Session.EnqueueActionToQueue(QueueId, ActionObject, Priority)) { const int32_t LsnField = EnqueueResult.Lsn; size_t Remaining = --RemainingWorkItems; int Submitted = ++SubmittedWorkItems; SubmitProgress->UpdateState({.Task = "Submitting work items", .Details = fmt::format("#{} LSN {}", Submitted, LsnField), .TotalCount = TotalWorkItems, .RemainingCount = Remaining}, false); if (!m_Config.OutputPath.empty()) { SaveActionOutput(LsnField, CurrentRecordingIndex, ActionId, ActionObject); } m_PendingJobs.Insert(LsnField); } else { if (!m_Config.Quiet) { std::string_view FunctionName = ActionObject["Function"sv].AsString(); const Guid FunctionVersion = ActionObject["FunctionVersion"sv].AsUuid(); const Guid BuildSystemVersion = ActionObject["BuildSystemVersion"sv].AsUuid(); ZEN_ERROR( "failed to resolve function for work with (Function:{},FunctionVersion:{},BuildSystemVersion:{}). Work " "descriptor " "at: 'file://{}'", std::string(FunctionName), FunctionVersion, BuildSystemVersion, ""); EmitFunctionListOnce(); } ++FailedWorkCounter; } } // Check for completed work DrainCompletedJobs(); SendOrchestratorHeartbeat(); }, TargetParallelism); SubmitProgress->Finish(); // Wait until all pending work is complete size_t TotalPendingJobs = m_PendingJobs.GetSize(); std::unique_ptr CompletionProgress = ProgressOwner->CreateProgressBar("Execute"); while (!m_PendingJobs.IsEmpty()) { size_t PendingCount = m_PendingJobs.GetSize(); CompletionProgress->UpdateState( {.Task = "Executing work items", .Details = fmt::format("{} completed, {} remaining", TotalPendingJobs - PendingCount, PendingCount), .TotalCount = TotalPendingJobs, .RemainingCount = PendingCount}, false); zen::Sleep(ProgressOwner->GetProgressUpdateDelayMS()); DrainCompletedJobs(); SendOrchestratorHeartbeat(); } CompletionProgress->Finish(); // Write summary files WriteSummaryFiles(); if (FailedWorkCounter.load()) { return 1; } return 0; } ////////////////////////////////////////////////////////////////////////// // ExecHttpSubCmd ExecHttpSubCmd::ExecHttpSubCmd(ExecCommand& Parent) : ZenSubCmdBase("http", "Forward requests to HTTP compute service"), m_Parent(Parent) { SubOptions().add_option("", "u", "hosturl", ZenCmdBase::kHostUrlHelp, cxxopts::value(m_HostName), ""); } void ExecHttpSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) { m_HostName = ZenCmdBase::ResolveTargetHostSpec(m_HostName); ZEN_ASSERT(m_Parent.m_ChunkResolver); ChunkResolver& Resolver = *m_Parent.m_ChunkResolver; std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp"); zen::compute::ComputeServiceSession ComputeSession(Resolver); ComputeSession.AddRemoteRunner(Resolver, TempPath, m_HostName); Stopwatch ExecTimer; int ReturnValue = m_Parent.RunSession(ComputeSession); ZEN_CONSOLE("complete - took {}", NiceTimeSpanMs(ExecTimer.GetElapsedTimeMs())); if (!ReturnValue) { ZEN_CONSOLE("all work items completed successfully"); } else { ZEN_CONSOLE("some work items failed (code {})", ReturnValue); } } ////////////////////////////////////////////////////////////////////////// // ExecInprocSubCmd ExecInprocSubCmd::ExecInprocSubCmd(ExecCommand& Parent) : ZenSubCmdBase("inproc", "Handle execution in-process"), m_Parent(Parent) { m_SubOptions.add_option("managed", "", "managed", "Use managed local runner (if supported)", cxxopts::value(m_Managed), ""); } void ExecInprocSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) { ZEN_ASSERT(m_Parent.m_ChunkResolver); ChunkResolver& Resolver = *m_Parent.m_ChunkResolver; zen::compute::ComputeServiceSession ComputeSession(Resolver); std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp"); if (m_Managed) { ZEN_CONSOLE_INFO("using managed local runner"); ComputeSession.AddManagedLocalRunner(Resolver, TempPath); } else { ZEN_CONSOLE_INFO("using local runner"); ComputeSession.AddLocalRunner(Resolver, TempPath); } Stopwatch ExecTimer; int ReturnValue = m_Parent.RunSession(ComputeSession); ZEN_CONSOLE("complete - took {}", NiceTimeSpanMs(ExecTimer.GetElapsedTimeMs())); if (!ReturnValue) { ZEN_CONSOLE("all work items completed successfully"); } else { ZEN_CONSOLE("some work items failed (code {})", ReturnValue); } } ////////////////////////////////////////////////////////////////////////// // ExecBeaconSubCmd ExecBeaconSubCmd::ExecBeaconSubCmd(ExecCommand& Parent) : ZenSubCmdBase("beacon", "Execute via beacon/orchestrator discovery") , m_Parent(Parent) { SubOptions().add_option("", "", "orch", "Orchestrator URL for worker discovery", cxxopts::value(m_OrchestratorUrl), ""); SubOptions().add_option("", "", "beacon", "Beacon path", cxxopts::value(m_BeaconPath), ""); } void ExecBeaconSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) { ZEN_ASSERT(m_Parent.m_ChunkResolver); ChunkResolver& Resolver = *m_Parent.m_ChunkResolver; std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp"); zen::compute::ComputeServiceSession ComputeSession(Resolver); if (!m_OrchestratorUrl.empty()) { ZEN_CONSOLE_INFO("using orchestrator at {}", m_OrchestratorUrl); ComputeSession.SetOrchestratorEndpoint(m_OrchestratorUrl); ComputeSession.SetOrchestratorBasePath(TempPath); } else { ZEN_CONSOLE_INFO("note: using hard-coded local worker path"); ComputeSession.AddRemoteRunner(Resolver, TempPath, "http://localhost:8558"); } Stopwatch ExecTimer; int ReturnValue = m_Parent.RunSession(ComputeSession, m_OrchestratorUrl); ZEN_CONSOLE("complete - took {}", NiceTimeSpanMs(ExecTimer.GetElapsedTimeMs())); if (!ReturnValue) { ZEN_CONSOLE("all work items completed successfully"); } else { ZEN_CONSOLE("some work items failed (code {})", ReturnValue); } } ////////////////////////////////////////////////////////////////////////// // ExecDumpSubCmd ExecDumpSubCmd::ExecDumpSubCmd(ExecCommand& Parent) : ZenSubCmdBase("dump", "Dump high level information about actions"), m_Parent(Parent) { } void ExecDumpSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) { std::atomic EmittedCount{0}; eastl::hash_map SeenAttachments; // Attachment CID -> count of references m_Parent.m_RecordingReader->IterateActions( [&](CbObject ActionObject, const IoHash& ActionId) { eastl::hash_map Attachments; uint64_t AttachmentBytes = 0; uint64_t UncompressedAttachmentBytes = 0; ActionObject.IterateAttachments([&](const zen::CbFieldView AttachmentField) { const IoHash AttachmentCid = AttachmentField.GetValue().AsHash(); IoBuffer AttachmentData = m_Parent.m_ChunkResolver->FindChunkByCid(AttachmentCid); IoHash RawHash; uint64_t RawSize = 0; CompressedBuffer CompressedData = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentData), RawHash, RawSize); Attachments[AttachmentCid] = CompressedData; AttachmentBytes += CompressedData.GetCompressedSize(); UncompressedAttachmentBytes += CompressedData.DecodeRawSize(); if (auto [Iter, Inserted] = SeenAttachments.insert({AttachmentCid, 1}); !Inserted) { ++Iter->second; } }); zen::ExtendableStringBuilder<1024> ObjStr; # if 0 zen::CompactBinaryToJson(ActionObject, ObjStr); ZEN_CONSOLE("work item {} ({} attachments): {}", ActionId, Attachments.size(), ObjStr); # else zen::CompactBinaryToYaml(ActionObject, ObjStr); ZEN_CONSOLE("work item {} ({} attachments, {}->{} bytes):\n{}", ActionId, Attachments.size(), AttachmentBytes, UncompressedAttachmentBytes, ObjStr); # endif ++EmittedCount; }, 1); ZEN_CONSOLE("emitted: {} actions", EmittedCount.load()); eastl::map> ReferenceHistogram; for (const auto& [K, V] : SeenAttachments) { if (V > 1) { ReferenceHistogram[V].push_back(K); } } for (const auto& [RefCount, Cids] : ReferenceHistogram) { ZEN_CONSOLE("{} attachments with {} references", Cids.size(), RefCount); } } ////////////////////////////////////////////////////////////////////////// // ExecBuildlogSubCmd ExecBuildlogSubCmd::ExecBuildlogSubCmd(ExecCommand& Parent) : ZenSubCmdBase("buildlog", "Generate build actions log"), m_Parent(Parent) { } void ExecBuildlogSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) { ZEN_ASSERT(m_Parent.m_ChunkResolver); ChunkResolver& Resolver = *m_Parent.m_ChunkResolver; if (std::filesystem::exists(m_Parent.m_RecordingLogPath)) { m_Parent.ThrowOptionError(fmt::format("recording log directory '{}' already exists!", m_Parent.m_RecordingLogPath)); } ZEN_NOT_IMPLEMENTED("build log generation not implemented yet!"); std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp"); zen::compute::ComputeServiceSession ComputeSession(Resolver); ComputeSession.StartRecording(Resolver, m_Parent.m_RecordingLogPath); Stopwatch ExecTimer; int ReturnValue = m_Parent.RunSession(ComputeSession); ZEN_CONSOLE("complete - took {}", NiceTimeSpanMs(ExecTimer.GetElapsedTimeMs())); if (!ReturnValue) { ZEN_CONSOLE("all work items completed successfully"); } else { ZEN_CONSOLE("some work items failed (code {})", ReturnValue); } } ////////////////////////////////////////////////////////////////////////// // ExecCommand ExecCommand::ExecCommand() { m_Options.add_options()("h,help", "Print help"); m_Options.add_option("replay", "p", "path", "Recording path (directory or .actionlog file)", cxxopts::value(m_RecordingPath), ""); m_Options.add_option("replay", "", "log", "Action log directory", cxxopts::value(m_RecordingLogPath), ""); m_Options.add_option("replay", "", "offset", "Recording replay start offset", cxxopts::value(m_Offset), ""); m_Options.add_option("replay", "", "stride", "Recording replay stride", cxxopts::value(m_Stride), ""); m_Options.add_option("replay", "", "limit", "Recording replay limit", cxxopts::value(m_Limit), ""); m_Options.add_option("", "", "quiet", "Quiet mode (less logging)", cxxopts::value(m_Quiet), ""); m_Options.add_option("output", "", "dump-actions", "Dump each action to console as it is dispatched", cxxopts::value(m_DumpActions), ""); m_Options.add_option("output", "o", "output", "Save action results to directory", cxxopts::value(m_OutputPath), ""); m_Options.add_option("output", "", "binary", "Write output as binary packages instead of YAML", cxxopts::value(m_Binary), ""); m_Options.add_option("__hidden__", "", "subcommand", "", cxxopts::value(m_SubCommand)->default_value(""), ""); m_Options.parse_positional({"subcommand"}); AddSubCommand(m_HttpSubCmd); AddSubCommand(m_InprocSubCmd); AddSubCommand(m_BeaconSubCmd); AddSubCommand(m_DumpSubCmd); AddSubCommand(m_BuildlogSubCmd); } ExecCommand::~ExecCommand() { } bool ExecCommand::OnParentOptionsParsed(const ZenCliOptions& GlobalOptions) { if (m_RecordingPath.empty()) { ThrowOptionError("replay path is required!"); } m_VerboseLogging = GlobalOptions.IsVerbose; m_QuietLogging = m_Quiet && !m_VerboseLogging; // Gather information from recording path std::filesystem::path RecordingPath{m_RecordingPath}; if (!std::filesystem::is_directory(RecordingPath)) { ThrowOptionError("replay path should be a directory path!"); } else { if (std::filesystem::is_directory(RecordingPath / "cid")) { m_Reader = std::make_unique(RecordingPath); m_WorkerMap = m_Reader->ReadWorkers(); m_ChunkResolver = m_Reader.get(); m_RecordingReader = m_Reader.get(); } else { m_UeReader = std::make_unique(RecordingPath); m_WorkerMap = m_UeReader->ReadWorkers(); m_ChunkResolver = m_UeReader.get(); m_RecordingReader = m_UeReader.get(); } } ZEN_CONSOLE("found {} workers, {} action items", m_WorkerMap.size(), m_RecordingReader->GetActionCount()); for (auto& Kv : m_WorkerMap) { CbObject WorkerDesc = Kv.second.GetObject(); const IoHash& WorkerId = Kv.first; RegisterWorkerFunctionsFromDescription(WorkerDesc, WorkerId); if (m_VerboseLogging) { zen::ExtendableStringBuilder<1024> ObjStr; # if 0 zen::CompactBinaryToJson(WorkerDesc, ObjStr); ZEN_CONSOLE("worker {}: {}", WorkerId, ObjStr); # else zen::CompactBinaryToYaml(WorkerDesc, ObjStr); ZEN_CONSOLE("worker {}:\n{}", WorkerId, ObjStr); # endif } } if (m_VerboseLogging) { EmitFunctionList(m_FunctionList); } return true; } int ExecCommand::RunSession(zen::compute::ComputeServiceSession& ComputeSession, std::string_view OrchestratorUrl) { ConsoleProgressMode ProgressMode = ConsoleProgressMode::Pretty; if (m_QuietLogging) { ProgressMode = ConsoleProgressMode::Quiet; } ExecSessionConfig Config{ .Resolver = *m_ChunkResolver, .RecordingReader = *m_RecordingReader, .WorkerMap = m_WorkerMap, .FunctionList = m_FunctionList, .OrchestratorUrl = OrchestratorUrl, .OutputPath = m_OutputPath, .Offset = m_Offset, .Stride = m_Stride, .Limit = m_Limit, .Verbose = m_VerboseLogging, .Quiet = m_QuietLogging, .DumpActions = m_DumpActions, .Binary = m_Binary, .ProgressMode = ProgressMode, }; ExecSessionRunner Runner(ComputeSession, Config); return Runner.Run(); } ////////////////////////////////////////////////////////////////////////// void ExecCommand::RegisterWorkerFunctionsFromDescription(const CbObject& WorkerDesc, const IoHash& WorkerId) { const Guid WorkerBuildSystemVersion = WorkerDesc["buildsystem_version"sv].AsUuid(); for (auto& Item : WorkerDesc["functions"sv]) { CbObjectView Function = Item.AsObjectView(); std::string_view FunctionName = Function["name"sv].AsString(); const Guid FunctionVersion = Function["version"sv].AsUuid(); m_FunctionList.emplace_back(FunctionDefinition{.FunctionName = std::string{FunctionName}, .FunctionVersion = FunctionVersion, .BuildSystemVersion = WorkerBuildSystemVersion, .WorkerId = WorkerId}); } } void ExecCommand::EmitFunctionList(const std::vector& FunctionList) { ZEN_CONSOLE("=== Known functions:\n==========================="); ZEN_CONSOLE("{:30} {:36} {:36} {}", "function", "version", "build system", "worker id"); for (const FunctionDefinition& Func : FunctionList) { ZEN_CONSOLE("{:30} {:36} {:36} {}", Func.FunctionName, Func.FunctionVersion, Func.BuildSystemVersion, Func.WorkerId); } ZEN_CONSOLE("==========================="); } } // namespace zen #endif // ZEN_WITH_COMPUTE_SERVICES