// Copyright Epic Games, Inc. All Rights Reserved. #include "exec_cmd.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include using namespace std::literals; namespace eastl { template<> struct hash : public zen::IoHash::Hasher { }; } // namespace eastl #if ZEN_WITH_COMPUTE_SERVICES namespace zen { ExecCommand::ExecCommand() { m_Options.add_options()("h,help", "Print help"); m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName), ""); m_Options.add_option("", "", "log", "Action log directory", cxxopts::value(m_RecordingLogPath), ""); m_Options.add_option("", "p", "path", "Recording path (directory or .actionlog file)", cxxopts::value(m_RecordingPath), ""); m_Options.add_option("", "", "offset", "Recording replay start offset", cxxopts::value(m_Offset), ""); m_Options.add_option("", "", "stride", "Recording replay stride", cxxopts::value(m_Stride), ""); m_Options.add_option("", "", "limit", "Recording replay limit", cxxopts::value(m_Limit), ""); m_Options.add_option("", "", "beacon", "Beacon path", cxxopts::value(m_BeaconPath), ""); m_Options.add_option("", "", "orch", "Orchestrator URL for worker discovery", cxxopts::value(m_OrchestratorUrl), ""); m_Options.add_option("", "", "mode", "Select execution mode (http,inproc,dump,direct,beacon,buildlog)", cxxopts::value(m_Mode)->default_value("http"), ""); m_Options .add_option("", "", "dump-actions", "Dump each action to console as it is dispatched", cxxopts::value(m_DumpActions), ""); m_Options.add_option("", "o", "output", "Save action results to directory", cxxopts::value(m_OutputPath), ""); m_Options.add_option("", "", "binary", "Write output as binary packages instead of YAML", cxxopts::value(m_Binary), ""); m_Options.add_option("", "", "quiet", "Quiet mode (less logging)", cxxopts::value(m_Quiet), ""); m_Options.parse_positional("mode"); } ExecCommand::~ExecCommand() { } void ExecCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { // Configure if (!ParseOptions(argc, argv)) { return; } m_HostName = ResolveTargetHostSpec(m_HostName); if (m_RecordingPath.empty()) { throw OptionParseException("replay path is required!", m_Options.help()); } m_VerboseLogging = GlobalOptions.IsVerbose; m_QuietLogging = m_Quiet && !m_VerboseLogging; enum ExecMode { kHttp, kDirect, kInproc, kDump, kBeacon, kBuildLog } Mode; if (m_Mode == "http"sv) { Mode = kHttp; } else if (m_Mode == "direct"sv) { Mode = kDirect; } else if (m_Mode == "inproc"sv) { Mode = kInproc; } else if (m_Mode == "dump"sv) { Mode = kDump; } else if (m_Mode == "beacon"sv) { Mode = kBeacon; } else if (m_Mode == "buildlog"sv) { Mode = kBuildLog; } else { throw OptionParseException("invalid mode specified!", m_Options.help()); } // Gather information from recording path std::unique_ptr Reader; std::unique_ptr UeReader; std::filesystem::path RecordingPath{m_RecordingPath}; if (!std::filesystem::is_directory(RecordingPath)) { throw OptionParseException("replay path should be a directory path!", m_Options.help()); } else { if (std::filesystem::is_directory(RecordingPath / "cid")) { Reader = std::make_unique(RecordingPath); m_WorkerMap = Reader->ReadWorkers(); m_ChunkResolver = Reader.get(); m_RecordingReader = Reader.get(); } else { UeReader = std::make_unique(RecordingPath); m_WorkerMap = UeReader->ReadWorkers(); m_ChunkResolver = UeReader.get(); m_RecordingReader = UeReader.get(); } } ZEN_CONSOLE("found {} workers, {} action items", m_WorkerMap.size(), m_RecordingReader->GetActionCount()); for (auto& Kv : m_WorkerMap) { CbObject WorkerDesc = Kv.second.GetObject(); const IoHash& WorkerId = Kv.first; RegisterWorkerFunctionsFromDescription(WorkerDesc, WorkerId); if (m_VerboseLogging) { zen::ExtendableStringBuilder<1024> ObjStr; # if 0 zen::CompactBinaryToJson(WorkerDesc, ObjStr); ZEN_CONSOLE("worker {}: {}", WorkerId, ObjStr); # else zen::CompactBinaryToYaml(WorkerDesc, ObjStr); ZEN_CONSOLE("worker {}:\n{}", WorkerId, ObjStr); # endif } } if (m_VerboseLogging) { EmitFunctionList(m_FunctionList); } // Iterate over work items and dispatch or log them int ReturnValue = 0; Stopwatch ExecTimer; switch (Mode) { case kHttp: // Forward requests to HTTP function service ReturnValue = HttpExecute(); break; case kDirect: // Not currently supported ReturnValue = LocalMessagingExecute(); break; case kInproc: // Handle execution in-core (by spawning child processes) ReturnValue = InProcessExecute(); break; case kDump: // Dump high level information about actions to console ReturnValue = DumpWorkItems(); break; case kBeacon: ReturnValue = BeaconExecute(); break; case kBuildLog: ReturnValue = BuildActionsLog(); break; default: ZEN_ERROR("Unknown operating mode! No work submitted"); ReturnValue = 1; } ZEN_CONSOLE("complete - took {}", NiceTimeSpanMs(ExecTimer.GetElapsedTimeMs())); if (!ReturnValue) { ZEN_CONSOLE("all work items completed successfully"); } else { ZEN_CONSOLE("some work items failed (code {})", ReturnValue); } } int ExecCommand::InProcessExecute() { ZEN_ASSERT(m_ChunkResolver); ChunkResolver& Resolver = *m_ChunkResolver; zen::compute::ComputeServiceSession ComputeSession(Resolver); std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp"); ComputeSession.AddLocalRunner(Resolver, TempPath); return ExecUsingSession(ComputeSession); } int ExecCommand::ExecUsingSession(zen::compute::ComputeServiceSession& ComputeSession) { struct JobTracker { public: inline void Insert(int LsnField) { RwLock::ExclusiveLockScope _(Lock); PendingJobs.insert(LsnField); } inline bool IsEmpty() const { RwLock::SharedLockScope _(Lock); return PendingJobs.empty(); } inline void Remove(int CompleteLsn) { RwLock::ExclusiveLockScope _(Lock); PendingJobs.erase(CompleteLsn); } inline size_t GetSize() const { RwLock::SharedLockScope _(Lock); return PendingJobs.size(); } private: mutable RwLock Lock; std::unordered_set PendingJobs; }; JobTracker PendingJobs; struct ActionSummaryEntry { int32_t Lsn = 0; int RecordingIndex = 0; IoHash ActionId; std::string FunctionName; int InputAttachments = 0; uint64_t InputBytes = 0; int OutputAttachments = 0; uint64_t OutputBytes = 0; float WallSeconds = 0.0f; float CpuSeconds = 0.0f; uint64_t SubmittedTicks = 0; uint64_t StartedTicks = 0; std::string ExecutionLocation; }; std::mutex SummaryLock; std::unordered_map SummaryEntries; ComputeSession.WaitUntilReady(); // Register as a client with the orchestrator (best-effort) std::string OrchestratorClientId; if (!m_OrchestratorUrl.empty()) { try { HttpClient OrchestratorClient(m_OrchestratorUrl); CbObjectWriter Ann; Ann << "session_id"sv << GetSessionId(); Ann << "hostname"sv << std::string_view(GetMachineName()); CbObjectWriter Meta; Meta << "source"sv << "zen-exec"sv; Ann << "metadata"sv << Meta.Save(); auto Resp = OrchestratorClient.Post("/orch/clients", Ann.Save()); if (Resp.IsSuccess()) { OrchestratorClientId = std::string(Resp.AsObject()["id"].AsString()); ZEN_CONSOLE_INFO("registered with orchestrator as {}", OrchestratorClientId); } else { ZEN_WARN("failed to register with orchestrator (status {})", static_cast(Resp.StatusCode)); } } catch (const std::exception& Ex) { ZEN_WARN("failed to register with orchestrator: {}", Ex.what()); } } Stopwatch OrchestratorHeartbeatTimer; auto SendOrchestratorHeartbeat = [&] { if (OrchestratorClientId.empty() || OrchestratorHeartbeatTimer.GetElapsedTimeMs() < 30'000) { return; } OrchestratorHeartbeatTimer.Reset(); try { HttpClient OrchestratorClient(m_OrchestratorUrl); std::ignore = OrchestratorClient.Post(fmt::format("/orch/clients/{}/update", OrchestratorClientId)); } catch (...) { } }; auto ClientCleanup = MakeGuard([&] { if (!OrchestratorClientId.empty()) { try { HttpClient OrchestratorClient(m_OrchestratorUrl); std::ignore = OrchestratorClient.Post(fmt::format("/orch/clients/{}/complete", OrchestratorClientId)); } catch (...) { } } }); // Create a queue to group all actions from this exec session CbObjectWriter Metadata; Metadata << "source"sv << "zen-exec"sv; auto QueueResult = ComputeSession.CreateQueue("zen-exec", Metadata.Save()); const int QueueId = QueueResult.QueueId; if (!QueueId) { ZEN_ERROR("failed to create compute queue"); return 1; } auto QueueCleanup = MakeGuard([&] { ComputeSession.DeleteQueue(QueueId); }); if (!m_OutputPath.empty()) { zen::CreateDirectories(m_OutputPath); } std::atomic IsDraining{0}; auto DrainCompletedJobs = [&] { if (IsDraining.exchange(1)) { return; } auto _ = MakeGuard([&] { IsDraining.store(0, std::memory_order_release); }); CbObjectWriter Cbo; ComputeSession.GetQueueCompleted(QueueId, Cbo); if (CbObject Completed = Cbo.Save()) { for (auto& It : Completed["completed"sv]) { int32_t CompleteLsn = It.AsInt32(); CbPackage ResultPackage; HttpResponseCode Response = ComputeSession.GetActionResult(CompleteLsn, /* out */ ResultPackage); if (Response == HttpResponseCode::OK) { if (!m_OutputPath.empty() && ResultPackage) { int OutputAttachments = 0; uint64_t OutputBytes = 0; if (!m_Binary) { // Write the root object as YAML ExtendableStringBuilder<4096> YamlStr; CompactBinaryToYaml(ResultPackage.GetObject(), YamlStr); std::string_view Yaml = YamlStr; zen::WriteFile(m_OutputPath / fmt::format("{}.result.yaml", CompleteLsn), IoBuffer(IoBuffer::Clone, Yaml.data(), Yaml.size())); // Write decompressed attachments auto Attachments = ResultPackage.GetAttachments(); if (!Attachments.empty()) { std::filesystem::path AttDir = m_OutputPath / fmt::format("{}.result.attachments", CompleteLsn); zen::CreateDirectories(AttDir); for (const CbAttachment& Att : Attachments) { ++OutputAttachments; IoHash AttHash = Att.GetHash(); if (Att.IsCompressedBinary()) { SharedBuffer Decompressed = Att.AsCompressedBinary().Decompress(); OutputBytes += Decompressed.GetSize(); zen::WriteFile(AttDir / AttHash.ToHexString(), IoBuffer(IoBuffer::Clone, Decompressed.GetData(), Decompressed.GetSize())); } else { SharedBuffer Binary = Att.AsBinary(); OutputBytes += Binary.GetSize(); zen::WriteFile(AttDir / AttHash.ToHexString(), IoBuffer(IoBuffer::Clone, Binary.GetData(), Binary.GetSize())); } } } if (!m_QuietLogging) { ZEN_CONSOLE("saved result: {}/{}.result.yaml ({} attachments)", m_OutputPath.string(), CompleteLsn, OutputAttachments); } } else { CompositeBuffer Serialized = FormatPackageMessageBuffer(ResultPackage); zen::WriteFile(m_OutputPath / fmt::format("{}.result.pkg", CompleteLsn), std::move(Serialized)); for (const CbAttachment& Att : ResultPackage.GetAttachments()) { ++OutputAttachments; OutputBytes += Att.AsBinary().GetSize(); } if (!m_QuietLogging) { ZEN_CONSOLE("saved result: {}/{}.result.pkg", m_OutputPath.string(), CompleteLsn); } } std::lock_guard Lock(SummaryLock); if (auto It2 = SummaryEntries.find(CompleteLsn); It2 != SummaryEntries.end()) { It2->second.OutputAttachments = OutputAttachments; It2->second.OutputBytes = OutputBytes; } } PendingJobs.Remove(CompleteLsn); ZEN_CONSOLE("completed: LSN {} ({} still pending)", CompleteLsn, PendingJobs.GetSize()); } } } }; // Describe workers ZEN_CONSOLE("describing {} workers", m_WorkerMap.size()); for (auto Kv : m_WorkerMap) { CbPackage WorkerDesc = Kv.second; ComputeSession.RegisterWorker(WorkerDesc); } // Then submit work items int FailedWorkCounter = 0; size_t RemainingWorkItems = m_RecordingReader->GetActionCount(); int SubmittedWorkItems = 0; ZEN_CONSOLE("submitting {} work items", RemainingWorkItems); int OffsetCounter = m_Offset; int StrideCounter = m_Stride; auto ShouldSchedule = [&]() -> bool { if (m_Limit && SubmittedWorkItems >= m_Limit) { // Limit reached, ignore return false; } if (OffsetCounter && OffsetCounter--) { // Still in offset, ignore return false; } if (--StrideCounter == 0) { StrideCounter = m_Stride; return true; } return false; }; int TargetParallelism = 8; if (OffsetCounter || StrideCounter || m_Limit) { TargetParallelism = 1; } std::atomic RecordingIndex{0}; m_RecordingReader->IterateActions( [&](CbObject ActionObject, const IoHash& ActionId) { // Enqueue job const int CurrentRecordingIndex = RecordingIndex++; Stopwatch SubmitTimer; const int Priority = 0; if (ShouldSchedule()) { if (m_VerboseLogging) { int AttachmentCount = 0; uint64_t AttachmentBytes = 0; eastl::hash_set ReferencedChunks; ActionObject.IterateAttachments([&](CbFieldView Field) { IoHash AttachData = Field.AsAttachment(); ReferencedChunks.insert(AttachData); ++AttachmentCount; if (IoBuffer ChunkData = m_ChunkResolver->FindChunkByCid(AttachData)) { AttachmentBytes += ChunkData.GetSize(); } }); zen::ExtendableStringBuilder<1024> ObjStr; zen::CompactBinaryToJson(ActionObject, ObjStr); ZEN_CONSOLE("work item {} ({} attachments, {} bytes): {}", ActionId, AttachmentCount, NiceBytes(AttachmentBytes), ObjStr); } if (m_DumpActions) { int AttachmentCount = 0; uint64_t AttachmentBytes = 0; ActionObject.IterateAttachments([&](CbFieldView Field) { IoHash AttachData = Field.AsAttachment(); ++AttachmentCount; if (IoBuffer ChunkData = m_ChunkResolver->FindChunkByCid(AttachData)) { AttachmentBytes += ChunkData.GetSize(); } }); zen::ExtendableStringBuilder<1024> ObjStr; zen::CompactBinaryToYaml(ActionObject, ObjStr); ZEN_CONSOLE("action {} ({} attachments, {}):\n{}", ActionId, AttachmentCount, NiceBytes(AttachmentBytes), ObjStr); } if (zen::compute::ComputeServiceSession::EnqueueResult EnqueueResult = ComputeSession.EnqueueActionToQueue(QueueId, ActionObject, Priority)) { const int32_t LsnField = EnqueueResult.Lsn; --RemainingWorkItems; ++SubmittedWorkItems; if (!m_QuietLogging) { ZEN_CONSOLE("submitted work item #{} - LSN {} - {}. {} remaining", SubmittedWorkItems, LsnField, NiceTimeSpanMs(SubmitTimer.GetElapsedTimeMs()), RemainingWorkItems); } if (!m_OutputPath.empty()) { ActionSummaryEntry Entry; Entry.Lsn = LsnField; Entry.RecordingIndex = CurrentRecordingIndex; Entry.ActionId = ActionId; Entry.FunctionName = std::string(ActionObject["Function"sv].AsString()); if (!m_Binary) { // Write action object as YAML ExtendableStringBuilder<4096> YamlStr; CompactBinaryToYaml(ActionObject, YamlStr); std::string_view Yaml = YamlStr; zen::WriteFile(m_OutputPath / fmt::format("{}.action.yaml", LsnField), IoBuffer(IoBuffer::Clone, Yaml.data(), Yaml.size())); // Write decompressed input attachments std::filesystem::path AttDir = m_OutputPath / fmt::format("{}.action.attachments", LsnField); bool AttDirCreated = false; ActionObject.IterateAttachments([&](CbFieldView Field) { IoHash AttachCid = Field.AsAttachment(); ++Entry.InputAttachments; if (IoBuffer ChunkData = m_ChunkResolver->FindChunkByCid(AttachCid)) { IoHash RawHash; uint64_t RawSize = 0; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(ChunkData), RawHash, RawSize); SharedBuffer Decompressed = Compressed.Decompress(); Entry.InputBytes += Decompressed.GetSize(); if (!AttDirCreated) { zen::CreateDirectories(AttDir); AttDirCreated = true; } zen::WriteFile(AttDir / AttachCid.ToHexString(), IoBuffer(IoBuffer::Clone, Decompressed.GetData(), Decompressed.GetSize())); } }); if (!m_QuietLogging) { ZEN_CONSOLE("saved action: {}/{}.action.yaml ({} attachments)", m_OutputPath.string(), LsnField, Entry.InputAttachments); } } else { // Build a CbPackage from the action and write as .pkg CbPackage ActionPackage; ActionPackage.SetObject(ActionObject); ActionObject.IterateAttachments([&](CbFieldView Field) { IoHash AttachCid = Field.AsAttachment(); ++Entry.InputAttachments; if (IoBuffer ChunkData = m_ChunkResolver->FindChunkByCid(AttachCid)) { IoHash RawHash; uint64_t RawSize = 0; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(ChunkData), RawHash, RawSize); Entry.InputBytes += ChunkData.GetSize(); ActionPackage.AddAttachment(CbAttachment(std::move(Compressed), RawHash)); } }); CompositeBuffer Serialized = FormatPackageMessageBuffer(ActionPackage); zen::WriteFile(m_OutputPath / fmt::format("{}.action.pkg", LsnField), std::move(Serialized)); if (!m_QuietLogging) { ZEN_CONSOLE("saved action: {}/{}.action.pkg", m_OutputPath.string(), LsnField); } } std::lock_guard Lock(SummaryLock); SummaryEntries.emplace(LsnField, std::move(Entry)); } PendingJobs.Insert(LsnField); } else { if (!m_QuietLogging) { std::string_view FunctionName = ActionObject["Function"sv].AsString(); const Guid FunctionVersion = ActionObject["FunctionVersion"sv].AsUuid(); const Guid BuildSystemVersion = ActionObject["BuildSystemVersion"sv].AsUuid(); ZEN_ERROR( "failed to resolve function for work with (Function:{},FunctionVersion:{},BuildSystemVersion:{}). Work " "descriptor " "at: 'file://{}'", std::string(FunctionName), FunctionVersion, BuildSystemVersion, ""); EmitFunctionListOnce(m_FunctionList); } ++FailedWorkCounter; } } // Check for completed work DrainCompletedJobs(); SendOrchestratorHeartbeat(); }, TargetParallelism); // Wait until all pending work is complete while (!PendingJobs.IsEmpty()) { // TODO: improve this logic zen::Sleep(500); DrainCompletedJobs(); SendOrchestratorHeartbeat(); } // Merge timing data from queue history into summary entries if (!SummaryEntries.empty()) { // RunnerAction::State indices (can't include functionrunner.h from here) constexpr int kStateNew = 0; constexpr int kStatePending = 1; constexpr int kStateRunning = 3; constexpr int kStateCompleted = 4; // first terminal state constexpr int kStateCount = 8; for (const auto& HistEntry : ComputeSession.GetQueueHistory(QueueId, 0)) { std::lock_guard Lock(SummaryLock); if (auto It = SummaryEntries.find(HistEntry.Lsn); It != SummaryEntries.end()) { // Find terminal state timestamp (Completed, Failed, Abandoned, or Cancelled) uint64_t EndTick = 0; for (int S = kStateCompleted; S < kStateCount; ++S) { if (HistEntry.Timestamps[S] != 0) { EndTick = HistEntry.Timestamps[S]; break; } } uint64_t StartTick = HistEntry.Timestamps[kStateNew]; if (EndTick > StartTick) { It->second.WallSeconds = float(double(EndTick - StartTick) / double(TimeSpan::TicksPerSecond)); } It->second.CpuSeconds = HistEntry.CpuSeconds; It->second.SubmittedTicks = HistEntry.Timestamps[kStatePending]; It->second.StartedTicks = HistEntry.Timestamps[kStateRunning]; It->second.ExecutionLocation = HistEntry.ExecutionLocation; } } } // Write summary file if output path is set if (!m_OutputPath.empty() && !SummaryEntries.empty()) { std::vector Sorted; Sorted.reserve(SummaryEntries.size()); for (auto& [_, Entry] : SummaryEntries) { Sorted.push_back(std::move(Entry)); } std::sort(Sorted.begin(), Sorted.end(), [](const ActionSummaryEntry& A, const ActionSummaryEntry& B) { return A.RecordingIndex < B.RecordingIndex; }); auto FormatTimestamp = [](uint64_t Ticks) -> std::string { if (Ticks == 0) { return "-"; } return DateTime(Ticks).ToString("%H:%M:%S.%s"); }; ExtendableStringBuilder<4096> Summary; Summary.Append(fmt::format("{:<8} {:<8} {:<40} {:<40} {:>8} {:>12} {:>8} {:>12} {:>8} {:>8} {:>12} {:>12} {:<24}\n", "LSN", "Index", "ActionId", "Function", "InAtt", "InBytes", "OutAtt", "OutBytes", "Wall(s)", "CPU(s)", "Submitted", "Started", "Location")); Summary.Append(fmt::format("{:-<8} {:-<8} {:-<40} {:-<40} {:-<8} {:-<12} {:-<8} {:-<12} {:-<8} {:-<8} {:-<12} {:-<12} {:-<24}\n", "", "", "", "", "", "", "", "", "", "", "", "", "")); for (const ActionSummaryEntry& Entry : Sorted) { Summary.Append(fmt::format("{:<8} {:<8} {:<40} {:<40} {:>8} {:>12} {:>8} {:>12} {:>8.2f} {:>8.2f} {:>12} {:>12} {:<24}\n", Entry.Lsn, Entry.RecordingIndex, Entry.ActionId, Entry.FunctionName, Entry.InputAttachments, NiceBytes(Entry.InputBytes), Entry.OutputAttachments, NiceBytes(Entry.OutputBytes), Entry.WallSeconds, Entry.CpuSeconds, FormatTimestamp(Entry.SubmittedTicks), FormatTimestamp(Entry.StartedTicks), Entry.ExecutionLocation)); } std::filesystem::path SummaryPath = m_OutputPath / "summary.txt"; std::string_view SummaryStr = Summary; zen::WriteFile(SummaryPath, IoBuffer(IoBuffer::Clone, SummaryStr.data(), SummaryStr.size())); ZEN_CONSOLE("wrote summary to {}", SummaryPath.string()); if (!m_Binary) { auto EscapeHtml = [](std::string_view Input) -> std::string { std::string Out; Out.reserve(Input.size()); for (char C : Input) { switch (C) { case '&': Out += "&"; break; case '<': Out += "<"; break; case '>': Out += ">"; break; case '"': Out += """; break; case '\'': Out += "'"; break; default: Out += C; } } return Out; }; auto EscapeJson = [](std::string_view Input) -> std::string { std::string Out; Out.reserve(Input.size()); for (char C : Input) { switch (C) { case '"': Out += "\\\""; break; case '\\': Out += "\\\\"; break; case '\n': Out += "\\n"; break; case '\r': Out += "\\r"; break; case '\t': Out += "\\t"; break; default: if (static_cast(C) < 0x20) { Out += fmt::format("\\u{:04x}", static_cast(static_cast(C))); } else { Out += C; } } } return Out; }; ExtendableStringBuilder<8192> Html; Html.Append(std::string_view(R"( Exec Summary

Exec Summary

LSN Index Action ID Function In Attachments In Bytes Out Attachments Out Bytes Wall(s) CPU(s) Submitted Started Location
)JS")); std::filesystem::path HtmlPath = m_OutputPath / "summary.html"; std::string_view HtmlStr = Html; zen::WriteFile(HtmlPath, IoBuffer(IoBuffer::Clone, HtmlStr.data(), HtmlStr.size())); ZEN_CONSOLE("wrote HTML summary to {}", HtmlPath.string()); } } if (FailedWorkCounter) { return 1; } return 0; } int ExecCommand::LocalMessagingExecute() { // Non-HTTP work submission path // To be reimplemented using final transport return 0; } ////////////////////////////////////////////////////////////////////////// int ExecCommand::HttpExecute() { ZEN_ASSERT(m_ChunkResolver); ChunkResolver& Resolver = *m_ChunkResolver; std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp"); zen::compute::ComputeServiceSession ComputeSession(Resolver); ComputeSession.AddRemoteRunner(Resolver, TempPath, m_HostName); return ExecUsingSession(ComputeSession); } int ExecCommand::BeaconExecute() { ZEN_ASSERT(m_ChunkResolver); ChunkResolver& Resolver = *m_ChunkResolver; std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp"); zen::compute::ComputeServiceSession ComputeSession(Resolver); if (!m_OrchestratorUrl.empty()) { ZEN_CONSOLE_INFO("using orchestrator at {}", m_OrchestratorUrl); ComputeSession.SetOrchestratorEndpoint(m_OrchestratorUrl); ComputeSession.SetOrchestratorBasePath(TempPath); } else { ZEN_CONSOLE_INFO("note: using hard-coded local worker path"); ComputeSession.AddRemoteRunner(Resolver, TempPath, "http://localhost:8558"); } return ExecUsingSession(ComputeSession); } ////////////////////////////////////////////////////////////////////////// void ExecCommand::RegisterWorkerFunctionsFromDescription(const CbObject& WorkerDesc, const IoHash& WorkerId) { const Guid WorkerBuildSystemVersion = WorkerDesc["buildsystem_version"sv].AsUuid(); for (auto& Item : WorkerDesc["functions"sv]) { CbObjectView Function = Item.AsObjectView(); std::string_view FunctionName = Function["name"sv].AsString(); const Guid FunctionVersion = Function["version"sv].AsUuid(); m_FunctionList.emplace_back(FunctionDefinition{.FunctionName = std::string{FunctionName}, .FunctionVersion = FunctionVersion, .BuildSystemVersion = WorkerBuildSystemVersion, .WorkerId = WorkerId}); } } void ExecCommand::EmitFunctionListOnce(const std::vector& FunctionList) { if (m_FunctionListEmittedOnce == false) { EmitFunctionList(FunctionList); m_FunctionListEmittedOnce = true; } } int ExecCommand::DumpWorkItems() { std::atomic EmittedCount{0}; eastl::hash_map SeenAttachments; // Attachment CID -> count of references m_RecordingReader->IterateActions( [&](CbObject ActionObject, const IoHash& ActionId) { eastl::hash_map Attachments; uint64_t AttachmentBytes = 0; uint64_t UncompressedAttachmentBytes = 0; ActionObject.IterateAttachments([&](const zen::CbFieldView AttachmentField) { const IoHash AttachmentCid = AttachmentField.GetValue().AsHash(); IoBuffer AttachmentData = m_ChunkResolver->FindChunkByCid(AttachmentCid); IoHash RawHash; uint64_t RawSize = 0; CompressedBuffer CompressedData = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentData), RawHash, RawSize); Attachments[AttachmentCid] = CompressedData; AttachmentBytes += CompressedData.GetCompressedSize(); UncompressedAttachmentBytes += CompressedData.DecodeRawSize(); if (auto [Iter, Inserted] = SeenAttachments.insert({AttachmentCid, 1}); !Inserted) { ++Iter->second; } }); zen::ExtendableStringBuilder<1024> ObjStr; # if 0 zen::CompactBinaryToJson(ActionObject, ObjStr); ZEN_CONSOLE("work item {} ({} attachments): {}", ActionId, Attachments.size(), ObjStr); # else zen::CompactBinaryToYaml(ActionObject, ObjStr); ZEN_CONSOLE("work item {} ({} attachments, {}->{} bytes):\n{}", ActionId, Attachments.size(), AttachmentBytes, UncompressedAttachmentBytes, ObjStr); # endif ++EmittedCount; }, 1); ZEN_CONSOLE("emitted: {} actions", EmittedCount.load()); eastl::map> ReferenceHistogram; for (const auto& [K, V] : SeenAttachments) { if (V > 1) { ReferenceHistogram[V].push_back(K); } } for (const auto& [RefCount, Cids] : ReferenceHistogram) { ZEN_CONSOLE("{} attachments with {} references", Cids.size(), RefCount); } return 0; } ////////////////////////////////////////////////////////////////////////// int ExecCommand::BuildActionsLog() { ZEN_ASSERT(m_ChunkResolver); ChunkResolver& Resolver = *m_ChunkResolver; if (m_RecordingPath.empty()) { throw OptionParseException("need to specify recording path", m_Options.help()); } if (std::filesystem::exists(m_RecordingLogPath)) { throw OptionParseException(fmt::format("recording log directory '{}' already exists!", m_RecordingLogPath), m_Options.help()); } ZEN_NOT_IMPLEMENTED("build log generation not implemented yet!"); std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp"); zen::compute::ComputeServiceSession ComputeSession(Resolver); ComputeSession.StartRecording(Resolver, m_RecordingLogPath); return ExecUsingSession(ComputeSession); } void ExecCommand::EmitFunctionList(const std::vector& FunctionList) { ZEN_CONSOLE("=== Known functions:\n==========================="); ZEN_CONSOLE("{:30} {:36} {:36} {}", "function", "version", "build system", "worker id"); for (const FunctionDefinition& Func : FunctionList) { ZEN_CONSOLE("{:30} {:36} {:36} {}", Func.FunctionName, Func.FunctionVersion, Func.BuildSystemVersion, Func.WorkerId); } ZEN_CONSOLE("==========================="); } } // namespace zen #endif // ZEN_WITH_COMPUTE_SERVICES