diff options
Diffstat (limited to 'src/zen/cmds/exec_cmd.cpp')
| -rw-r--r-- | src/zen/cmds/exec_cmd.cpp | 1374 |
1 files changed, 1374 insertions, 0 deletions
diff --git a/src/zen/cmds/exec_cmd.cpp b/src/zen/cmds/exec_cmd.cpp new file mode 100644 index 000000000..42c7119e7 --- /dev/null +++ b/src/zen/cmds/exec_cmd.cpp @@ -0,0 +1,1374 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "exec_cmd.h" + +#include <zencompute/computeservice.h> +#include <zencompute/recordingreader.h> +#include <zencore/compactbinary.h> +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinaryfile.h> +#include <zencore/compactbinarypackage.h> +#include <zencore/compactbinaryvalue.h> +#include <zencore/compress.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/scopeguard.h> +#include <zencore/session.h> +#include <zencore/stream.h> +#include <zencore/string.h> +#include <zencore/system.h> +#include <zencore/timer.h> +#include <zenhttp/httpclient.h> +#include <zenhttp/packageformat.h> + +#include <EASTL/hash_map.h> +#include <EASTL/hash_set.h> +#include <EASTL/map.h> + +using namespace std::literals; + +namespace eastl { + +template<> +struct hash<zen::IoHash> : public zen::IoHash::Hasher +{ +}; + +} // namespace eastl + +#if ZEN_WITH_COMPUTE_SERVICES + +namespace zen { + +ExecCommand::ExecCommand() +{ + m_Options.add_options()("h,help", "Print help"); + m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName), "<hosturl>"); + m_Options.add_option("", "", "log", "Action log directory", cxxopts::value(m_RecordingLogPath), "<path>"); + m_Options.add_option("", "p", "path", "Recording path (directory or .actionlog file)", cxxopts::value(m_RecordingPath), "<path>"); + m_Options.add_option("", "", "offset", "Recording replay start offset", cxxopts::value(m_Offset), "<offset>"); + m_Options.add_option("", "", "stride", "Recording replay stride", cxxopts::value(m_Stride), "<stride>"); + m_Options.add_option("", "", "limit", "Recording replay limit", cxxopts::value(m_Limit), "<limit>"); + m_Options.add_option("", "", "beacon", "Beacon path", cxxopts::value(m_BeaconPath), "<path>"); + m_Options.add_option("", "", "orch", "Orchestrator URL for worker discovery", cxxopts::value(m_OrchestratorUrl), "<url>"); + m_Options.add_option("", + "", + "mode", + "Select execution mode (http,inproc,dump,direct,beacon,buildlog)", + cxxopts::value(m_Mode)->default_value("http"), + "<string>"); + m_Options + .add_option("", "", "dump-actions", "Dump each action to console as it is dispatched", cxxopts::value(m_DumpActions), "<bool>"); + m_Options.add_option("", "o", "output", "Save action results to directory", cxxopts::value(m_OutputPath), "<path>"); + m_Options.add_option("", "", "binary", "Write output as binary packages instead of YAML", cxxopts::value(m_Binary), "<bool>"); + m_Options.add_option("", "", "quiet", "Quiet mode (less logging)", cxxopts::value(m_Quiet), "<bool>"); + m_Options.parse_positional("mode"); +} + +ExecCommand::~ExecCommand() +{ +} + +void +ExecCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + // Configure + + if (!ParseOptions(argc, argv)) + { + return; + } + + m_HostName = ResolveTargetHostSpec(m_HostName); + + if (m_RecordingPath.empty()) + { + throw OptionParseException("replay path is required!", m_Options.help()); + } + + m_VerboseLogging = GlobalOptions.IsVerbose; + m_QuietLogging = m_Quiet && !m_VerboseLogging; + + enum ExecMode + { + kHttp, + kDirect, + kInproc, + kDump, + kBeacon, + kBuildLog + } Mode; + + if (m_Mode == "http"sv) + { + Mode = kHttp; + } + else if (m_Mode == "direct"sv) + { + Mode = kDirect; + } + else if (m_Mode == "inproc"sv) + { + Mode = kInproc; + } + else if (m_Mode == "dump"sv) + { + Mode = kDump; + } + else if (m_Mode == "beacon"sv) + { + Mode = kBeacon; + } + else if (m_Mode == "buildlog"sv) + { + Mode = kBuildLog; + } + else + { + throw OptionParseException("invalid mode specified!", m_Options.help()); + } + + // Gather information from recording path + + std::unique_ptr<zen::compute::RecordingReader> Reader; + std::unique_ptr<zen::compute::UeRecordingReader> UeReader; + + std::filesystem::path RecordingPath{m_RecordingPath}; + + if (!std::filesystem::is_directory(RecordingPath)) + { + throw OptionParseException("replay path should be a directory path!", m_Options.help()); + } + else + { + if (std::filesystem::is_directory(RecordingPath / "cid")) + { + Reader = std::make_unique<zen::compute::RecordingReader>(RecordingPath); + m_WorkerMap = Reader->ReadWorkers(); + m_ChunkResolver = Reader.get(); + m_RecordingReader = Reader.get(); + } + else + { + UeReader = std::make_unique<zen::compute::UeRecordingReader>(RecordingPath); + m_WorkerMap = UeReader->ReadWorkers(); + m_ChunkResolver = UeReader.get(); + m_RecordingReader = UeReader.get(); + } + } + + ZEN_CONSOLE("found {} workers, {} action items", m_WorkerMap.size(), m_RecordingReader->GetActionCount()); + + for (auto& Kv : m_WorkerMap) + { + CbObject WorkerDesc = Kv.second.GetObject(); + const IoHash& WorkerId = Kv.first; + + RegisterWorkerFunctionsFromDescription(WorkerDesc, WorkerId); + + if (m_VerboseLogging) + { + zen::ExtendableStringBuilder<1024> ObjStr; +# if 0 + zen::CompactBinaryToJson(WorkerDesc, ObjStr); + ZEN_CONSOLE("worker {}: {}", WorkerId, ObjStr); +# else + zen::CompactBinaryToYaml(WorkerDesc, ObjStr); + ZEN_CONSOLE("worker {}:\n{}", WorkerId, ObjStr); +# endif + } + } + + if (m_VerboseLogging) + { + EmitFunctionList(m_FunctionList); + } + + // Iterate over work items and dispatch or log them + + int ReturnValue = 0; + + Stopwatch ExecTimer; + + switch (Mode) + { + case kHttp: + // Forward requests to HTTP function service + ReturnValue = HttpExecute(); + break; + + case kDirect: + // Not currently supported + ReturnValue = LocalMessagingExecute(); + break; + + case kInproc: + // Handle execution in-core (by spawning child processes) + ReturnValue = InProcessExecute(); + break; + + case kDump: + // Dump high level information about actions to console + ReturnValue = DumpWorkItems(); + break; + + case kBeacon: + ReturnValue = BeaconExecute(); + break; + + case kBuildLog: + ReturnValue = BuildActionsLog(); + break; + + default: + ZEN_ERROR("Unknown operating mode! No work submitted"); + + ReturnValue = 1; + } + + ZEN_CONSOLE("complete - took {}", NiceTimeSpanMs(ExecTimer.GetElapsedTimeMs())); + + if (!ReturnValue) + { + ZEN_CONSOLE("all work items completed successfully"); + } + else + { + ZEN_CONSOLE("some work items failed (code {})", ReturnValue); + } +} + +int +ExecCommand::InProcessExecute() +{ + ZEN_ASSERT(m_ChunkResolver); + ChunkResolver& Resolver = *m_ChunkResolver; + + zen::compute::ComputeServiceSession ComputeSession(Resolver); + + std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp"); + ComputeSession.AddLocalRunner(Resolver, TempPath); + + return ExecUsingSession(ComputeSession); +} + +int +ExecCommand::ExecUsingSession(zen::compute::ComputeServiceSession& ComputeSession) +{ + struct JobTracker + { + public: + inline void Insert(int LsnField) + { + RwLock::ExclusiveLockScope _(Lock); + PendingJobs.insert(LsnField); + } + + inline bool IsEmpty() const + { + RwLock::SharedLockScope _(Lock); + return PendingJobs.empty(); + } + + inline void Remove(int CompleteLsn) + { + RwLock::ExclusiveLockScope _(Lock); + PendingJobs.erase(CompleteLsn); + } + + inline size_t GetSize() const + { + RwLock::SharedLockScope _(Lock); + return PendingJobs.size(); + } + + private: + mutable RwLock Lock; + std::unordered_set<int> PendingJobs; + }; + + JobTracker PendingJobs; + + struct ActionSummaryEntry + { + int32_t Lsn = 0; + int RecordingIndex = 0; + IoHash ActionId; + std::string FunctionName; + int InputAttachments = 0; + uint64_t InputBytes = 0; + int OutputAttachments = 0; + uint64_t OutputBytes = 0; + float WallSeconds = 0.0f; + float CpuSeconds = 0.0f; + uint64_t SubmittedTicks = 0; + uint64_t StartedTicks = 0; + std::string ExecutionLocation; + }; + + std::mutex SummaryLock; + std::unordered_map<int32_t, ActionSummaryEntry> SummaryEntries; + + ComputeSession.WaitUntilReady(); + + // Register as a client with the orchestrator (best-effort) + + std::string OrchestratorClientId; + + if (!m_OrchestratorUrl.empty()) + { + try + { + HttpClient OrchestratorClient(m_OrchestratorUrl); + + CbObjectWriter Ann; + Ann << "session_id"sv << GetSessionId(); + Ann << "hostname"sv << std::string_view(GetMachineName()); + + CbObjectWriter Meta; + Meta << "source"sv + << "zen-exec"sv; + Ann << "metadata"sv << Meta.Save(); + + auto Resp = OrchestratorClient.Post("/orch/clients", Ann.Save()); + if (Resp.IsSuccess()) + { + OrchestratorClientId = std::string(Resp.AsObject()["id"].AsString()); + ZEN_CONSOLE_INFO("registered with orchestrator as {}", OrchestratorClientId); + } + else + { + ZEN_WARN("failed to register with orchestrator (status {})", static_cast<int>(Resp.StatusCode)); + } + } + catch (const std::exception& Ex) + { + ZEN_WARN("failed to register with orchestrator: {}", Ex.what()); + } + } + + Stopwatch OrchestratorHeartbeatTimer; + + auto SendOrchestratorHeartbeat = [&] { + if (OrchestratorClientId.empty() || OrchestratorHeartbeatTimer.GetElapsedTimeMs() < 30'000) + { + return; + } + OrchestratorHeartbeatTimer.Reset(); + try + { + HttpClient OrchestratorClient(m_OrchestratorUrl); + std::ignore = OrchestratorClient.Post(fmt::format("/orch/clients/{}/update", OrchestratorClientId)); + } + catch (...) + { + } + }; + + auto ClientCleanup = MakeGuard([&] { + if (!OrchestratorClientId.empty()) + { + try + { + HttpClient OrchestratorClient(m_OrchestratorUrl); + std::ignore = OrchestratorClient.Post(fmt::format("/orch/clients/{}/complete", OrchestratorClientId)); + } + catch (...) + { + } + } + }); + + // Create a queue to group all actions from this exec session + + CbObjectWriter Metadata; + Metadata << "source"sv + << "zen-exec"sv; + + auto QueueResult = ComputeSession.CreateQueue("zen-exec", Metadata.Save()); + const int QueueId = QueueResult.QueueId; + if (!QueueId) + { + ZEN_ERROR("failed to create compute queue"); + return 1; + } + + auto QueueCleanup = MakeGuard([&] { ComputeSession.DeleteQueue(QueueId); }); + + if (!m_OutputPath.empty()) + { + zen::CreateDirectories(m_OutputPath); + } + + std::atomic<int> IsDraining{0}; + + auto DrainCompletedJobs = [&] { + if (IsDraining.exchange(1)) + { + return; + } + + auto _ = MakeGuard([&] { IsDraining.store(0, std::memory_order_release); }); + + CbObjectWriter Cbo; + ComputeSession.GetQueueCompleted(QueueId, Cbo); + + if (CbObject Completed = Cbo.Save()) + { + for (auto& It : Completed["completed"sv]) + { + int32_t CompleteLsn = It.AsInt32(); + + CbPackage ResultPackage; + HttpResponseCode Response = ComputeSession.GetActionResult(CompleteLsn, /* out */ ResultPackage); + + if (Response == HttpResponseCode::OK) + { + if (!m_OutputPath.empty() && ResultPackage) + { + int OutputAttachments = 0; + uint64_t OutputBytes = 0; + + if (!m_Binary) + { + // Write the root object as YAML + ExtendableStringBuilder<4096> YamlStr; + CompactBinaryToYaml(ResultPackage.GetObject(), YamlStr); + + std::string_view Yaml = YamlStr; + zen::WriteFile(m_OutputPath / fmt::format("{}.result.yaml", CompleteLsn), + IoBuffer(IoBuffer::Clone, Yaml.data(), Yaml.size())); + + // Write decompressed attachments + auto Attachments = ResultPackage.GetAttachments(); + + if (!Attachments.empty()) + { + std::filesystem::path AttDir = m_OutputPath / fmt::format("{}.result.attachments", CompleteLsn); + zen::CreateDirectories(AttDir); + + for (const CbAttachment& Att : Attachments) + { + ++OutputAttachments; + + IoHash AttHash = Att.GetHash(); + + if (Att.IsCompressedBinary()) + { + SharedBuffer Decompressed = Att.AsCompressedBinary().Decompress(); + OutputBytes += Decompressed.GetSize(); + zen::WriteFile(AttDir / AttHash.ToHexString(), + IoBuffer(IoBuffer::Clone, Decompressed.GetData(), Decompressed.GetSize())); + } + else + { + SharedBuffer Binary = Att.AsBinary(); + OutputBytes += Binary.GetSize(); + zen::WriteFile(AttDir / AttHash.ToHexString(), + IoBuffer(IoBuffer::Clone, Binary.GetData(), Binary.GetSize())); + } + } + } + + if (!m_QuietLogging) + { + ZEN_CONSOLE("saved result: {}/{}.result.yaml ({} attachments)", + m_OutputPath.string(), + CompleteLsn, + OutputAttachments); + } + } + else + { + CompositeBuffer Serialized = FormatPackageMessageBuffer(ResultPackage); + zen::WriteFile(m_OutputPath / fmt::format("{}.result.pkg", CompleteLsn), std::move(Serialized)); + + for (const CbAttachment& Att : ResultPackage.GetAttachments()) + { + ++OutputAttachments; + OutputBytes += Att.AsBinary().GetSize(); + } + + if (!m_QuietLogging) + { + ZEN_CONSOLE("saved result: {}/{}.result.pkg", m_OutputPath.string(), CompleteLsn); + } + } + + std::lock_guard Lock(SummaryLock); + if (auto It2 = SummaryEntries.find(CompleteLsn); It2 != SummaryEntries.end()) + { + It2->second.OutputAttachments = OutputAttachments; + It2->second.OutputBytes = OutputBytes; + } + } + + PendingJobs.Remove(CompleteLsn); + + ZEN_CONSOLE("completed: LSN {} ({} still pending)", CompleteLsn, PendingJobs.GetSize()); + } + } + } + }; + + // Describe workers + + ZEN_CONSOLE("describing {} workers", m_WorkerMap.size()); + + for (auto Kv : m_WorkerMap) + { + CbPackage WorkerDesc = Kv.second; + + ComputeSession.RegisterWorker(WorkerDesc); + } + + // Then submit work items + + int FailedWorkCounter = 0; + size_t RemainingWorkItems = m_RecordingReader->GetActionCount(); + int SubmittedWorkItems = 0; + + ZEN_CONSOLE("submitting {} work items", RemainingWorkItems); + + int OffsetCounter = m_Offset; + int StrideCounter = m_Stride; + + auto ShouldSchedule = [&]() -> bool { + if (m_Limit && SubmittedWorkItems >= m_Limit) + { + // Limit reached, ignore + + return false; + } + + if (OffsetCounter && OffsetCounter--) + { + // Still in offset, ignore + + return false; + } + + if (--StrideCounter == 0) + { + StrideCounter = m_Stride; + + return true; + } + + return false; + }; + + int TargetParallelism = 8; + + if (OffsetCounter || StrideCounter || m_Limit) + { + TargetParallelism = 1; + } + + std::atomic<int> RecordingIndex{0}; + + m_RecordingReader->IterateActions( + [&](CbObject ActionObject, const IoHash& ActionId) { + // Enqueue job + + const int CurrentRecordingIndex = RecordingIndex++; + + Stopwatch SubmitTimer; + + const int Priority = 0; + + if (ShouldSchedule()) + { + if (m_VerboseLogging) + { + int AttachmentCount = 0; + uint64_t AttachmentBytes = 0; + eastl::hash_set<IoHash> ReferencedChunks; + + ActionObject.IterateAttachments([&](CbFieldView Field) { + IoHash AttachData = Field.AsAttachment(); + + ReferencedChunks.insert(AttachData); + ++AttachmentCount; + + if (IoBuffer ChunkData = m_ChunkResolver->FindChunkByCid(AttachData)) + { + AttachmentBytes += ChunkData.GetSize(); + } + }); + + zen::ExtendableStringBuilder<1024> ObjStr; + zen::CompactBinaryToJson(ActionObject, ObjStr); + ZEN_CONSOLE("work item {} ({} attachments, {} bytes): {}", + ActionId, + AttachmentCount, + NiceBytes(AttachmentBytes), + ObjStr); + } + + if (m_DumpActions) + { + int AttachmentCount = 0; + uint64_t AttachmentBytes = 0; + + ActionObject.IterateAttachments([&](CbFieldView Field) { + IoHash AttachData = Field.AsAttachment(); + + ++AttachmentCount; + + if (IoBuffer ChunkData = m_ChunkResolver->FindChunkByCid(AttachData)) + { + AttachmentBytes += ChunkData.GetSize(); + } + }); + + zen::ExtendableStringBuilder<1024> ObjStr; + zen::CompactBinaryToYaml(ActionObject, ObjStr); + ZEN_CONSOLE("action {} ({} attachments, {}):\n{}", ActionId, AttachmentCount, NiceBytes(AttachmentBytes), ObjStr); + } + + if (zen::compute::ComputeServiceSession::EnqueueResult EnqueueResult = + ComputeSession.EnqueueActionToQueue(QueueId, ActionObject, Priority)) + { + const int32_t LsnField = EnqueueResult.Lsn; + + --RemainingWorkItems; + ++SubmittedWorkItems; + + if (!m_QuietLogging) + { + ZEN_CONSOLE("submitted work item #{} - LSN {} - {}. {} remaining", + SubmittedWorkItems, + LsnField, + NiceTimeSpanMs(SubmitTimer.GetElapsedTimeMs()), + RemainingWorkItems); + } + + if (!m_OutputPath.empty()) + { + ActionSummaryEntry Entry; + Entry.Lsn = LsnField; + Entry.RecordingIndex = CurrentRecordingIndex; + Entry.ActionId = ActionId; + Entry.FunctionName = std::string(ActionObject["Function"sv].AsString()); + + if (!m_Binary) + { + // Write action object as YAML + ExtendableStringBuilder<4096> YamlStr; + CompactBinaryToYaml(ActionObject, YamlStr); + + std::string_view Yaml = YamlStr; + zen::WriteFile(m_OutputPath / fmt::format("{}.action.yaml", LsnField), + IoBuffer(IoBuffer::Clone, Yaml.data(), Yaml.size())); + + // Write decompressed input attachments + std::filesystem::path AttDir = m_OutputPath / fmt::format("{}.action.attachments", LsnField); + bool AttDirCreated = false; + + ActionObject.IterateAttachments([&](CbFieldView Field) { + IoHash AttachCid = Field.AsAttachment(); + ++Entry.InputAttachments; + + if (IoBuffer ChunkData = m_ChunkResolver->FindChunkByCid(AttachCid)) + { + IoHash RawHash; + uint64_t RawSize = 0; + CompressedBuffer Compressed = + CompressedBuffer::FromCompressed(SharedBuffer(ChunkData), RawHash, RawSize); + SharedBuffer Decompressed = Compressed.Decompress(); + + Entry.InputBytes += Decompressed.GetSize(); + + if (!AttDirCreated) + { + zen::CreateDirectories(AttDir); + AttDirCreated = true; + } + + zen::WriteFile(AttDir / AttachCid.ToHexString(), + IoBuffer(IoBuffer::Clone, Decompressed.GetData(), Decompressed.GetSize())); + } + }); + + if (!m_QuietLogging) + { + ZEN_CONSOLE("saved action: {}/{}.action.yaml ({} attachments)", + m_OutputPath.string(), + LsnField, + Entry.InputAttachments); + } + } + else + { + // Build a CbPackage from the action and write as .pkg + CbPackage ActionPackage; + ActionPackage.SetObject(ActionObject); + + ActionObject.IterateAttachments([&](CbFieldView Field) { + IoHash AttachCid = Field.AsAttachment(); + ++Entry.InputAttachments; + + if (IoBuffer ChunkData = m_ChunkResolver->FindChunkByCid(AttachCid)) + { + IoHash RawHash; + uint64_t RawSize = 0; + CompressedBuffer Compressed = + CompressedBuffer::FromCompressed(SharedBuffer(ChunkData), RawHash, RawSize); + + Entry.InputBytes += ChunkData.GetSize(); + ActionPackage.AddAttachment(CbAttachment(std::move(Compressed), RawHash)); + } + }); + + CompositeBuffer Serialized = FormatPackageMessageBuffer(ActionPackage); + zen::WriteFile(m_OutputPath / fmt::format("{}.action.pkg", LsnField), std::move(Serialized)); + + if (!m_QuietLogging) + { + ZEN_CONSOLE("saved action: {}/{}.action.pkg", m_OutputPath.string(), LsnField); + } + } + + std::lock_guard Lock(SummaryLock); + SummaryEntries.emplace(LsnField, std::move(Entry)); + } + + PendingJobs.Insert(LsnField); + } + else + { + if (!m_QuietLogging) + { + std::string_view FunctionName = ActionObject["Function"sv].AsString(); + const Guid FunctionVersion = ActionObject["FunctionVersion"sv].AsUuid(); + const Guid BuildSystemVersion = ActionObject["BuildSystemVersion"sv].AsUuid(); + + ZEN_ERROR( + "failed to resolve function for work with (Function:{},FunctionVersion:{},BuildSystemVersion:{}). Work " + "descriptor " + "at: 'file://{}'", + std::string(FunctionName), + FunctionVersion, + BuildSystemVersion, + "<null>"); + + EmitFunctionListOnce(m_FunctionList); + } + + ++FailedWorkCounter; + } + } + + // Check for completed work + + DrainCompletedJobs(); + SendOrchestratorHeartbeat(); + }, + TargetParallelism); + + // Wait until all pending work is complete + + while (!PendingJobs.IsEmpty()) + { + // TODO: improve this logic + zen::Sleep(500); + + DrainCompletedJobs(); + SendOrchestratorHeartbeat(); + } + + // Merge timing data from queue history into summary entries + + if (!SummaryEntries.empty()) + { + // RunnerAction::State indices (can't include functionrunner.h from here) + constexpr int kStateNew = 0; + constexpr int kStatePending = 1; + constexpr int kStateRunning = 3; + constexpr int kStateCompleted = 4; // first terminal state + constexpr int kStateCount = 8; + + for (const auto& HistEntry : ComputeSession.GetQueueHistory(QueueId, 0)) + { + std::lock_guard Lock(SummaryLock); + if (auto It = SummaryEntries.find(HistEntry.Lsn); It != SummaryEntries.end()) + { + // Find terminal state timestamp (Completed, Failed, Abandoned, or Cancelled) + uint64_t EndTick = 0; + for (int S = kStateCompleted; S < kStateCount; ++S) + { + if (HistEntry.Timestamps[S] != 0) + { + EndTick = HistEntry.Timestamps[S]; + break; + } + } + uint64_t StartTick = HistEntry.Timestamps[kStateNew]; + if (EndTick > StartTick) + { + It->second.WallSeconds = float(double(EndTick - StartTick) / double(TimeSpan::TicksPerSecond)); + } + It->second.CpuSeconds = HistEntry.CpuSeconds; + It->second.SubmittedTicks = HistEntry.Timestamps[kStatePending]; + It->second.StartedTicks = HistEntry.Timestamps[kStateRunning]; + It->second.ExecutionLocation = HistEntry.ExecutionLocation; + } + } + } + + // Write summary file if output path is set + + if (!m_OutputPath.empty() && !SummaryEntries.empty()) + { + std::vector<ActionSummaryEntry> Sorted; + Sorted.reserve(SummaryEntries.size()); + for (auto& [_, Entry] : SummaryEntries) + { + Sorted.push_back(std::move(Entry)); + } + + std::sort(Sorted.begin(), Sorted.end(), [](const ActionSummaryEntry& A, const ActionSummaryEntry& B) { + return A.RecordingIndex < B.RecordingIndex; + }); + + auto FormatTimestamp = [](uint64_t Ticks) -> std::string { + if (Ticks == 0) + { + return "-"; + } + return DateTime(Ticks).ToString("%H:%M:%S.%s"); + }; + + ExtendableStringBuilder<4096> Summary; + Summary.Append(fmt::format("{:<8} {:<8} {:<40} {:<40} {:>8} {:>12} {:>8} {:>12} {:>8} {:>8} {:>12} {:>12} {:<24}\n", + "LSN", + "Index", + "ActionId", + "Function", + "InAtt", + "InBytes", + "OutAtt", + "OutBytes", + "Wall(s)", + "CPU(s)", + "Submitted", + "Started", + "Location")); + Summary.Append(fmt::format("{:-<8} {:-<8} {:-<40} {:-<40} {:-<8} {:-<12} {:-<8} {:-<12} {:-<8} {:-<8} {:-<12} {:-<12} {:-<24}\n", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "")); + + for (const ActionSummaryEntry& Entry : Sorted) + { + Summary.Append(fmt::format("{:<8} {:<8} {:<40} {:<40} {:>8} {:>12} {:>8} {:>12} {:>8.2f} {:>8.2f} {:>12} {:>12} {:<24}\n", + Entry.Lsn, + Entry.RecordingIndex, + Entry.ActionId, + Entry.FunctionName, + Entry.InputAttachments, + NiceBytes(Entry.InputBytes), + Entry.OutputAttachments, + NiceBytes(Entry.OutputBytes), + Entry.WallSeconds, + Entry.CpuSeconds, + FormatTimestamp(Entry.SubmittedTicks), + FormatTimestamp(Entry.StartedTicks), + Entry.ExecutionLocation)); + } + + std::filesystem::path SummaryPath = m_OutputPath / "summary.txt"; + std::string_view SummaryStr = Summary; + zen::WriteFile(SummaryPath, IoBuffer(IoBuffer::Clone, SummaryStr.data(), SummaryStr.size())); + + ZEN_CONSOLE("wrote summary to {}", SummaryPath.string()); + + if (!m_Binary) + { + auto EscapeHtml = [](std::string_view Input) -> std::string { + std::string Out; + Out.reserve(Input.size()); + for (char C : Input) + { + switch (C) + { + case '&': + Out += "&"; + break; + case '<': + Out += "<"; + break; + case '>': + Out += ">"; + break; + case '"': + Out += """; + break; + case '\'': + Out += "'"; + break; + default: + Out += C; + } + } + return Out; + }; + + auto EscapeJson = [](std::string_view Input) -> std::string { + std::string Out; + Out.reserve(Input.size()); + for (char C : Input) + { + switch (C) + { + case '"': + Out += "\\\""; + break; + case '\\': + Out += "\\\\"; + break; + case '\n': + Out += "\\n"; + break; + case '\r': + Out += "\\r"; + break; + case '\t': + Out += "\\t"; + break; + default: + if (static_cast<unsigned char>(C) < 0x20) + { + Out += fmt::format("\\u{:04x}", static_cast<unsigned>(static_cast<unsigned char>(C))); + } + else + { + Out += C; + } + } + } + return Out; + }; + + ExtendableStringBuilder<8192> Html; + + Html.Append(std::string_view(R"(<!DOCTYPE html> +<html><head><meta charset="utf-8"><title>Exec Summary</title> +<style> +body{font-family:system-ui,sans-serif;margin:20px;background:#fafafa} +#container{overflow-y:auto;height:calc(100vh - 120px)} +table{border-collapse:collapse;width:100%} +th,td{border:1px solid #ddd;padding:6px 10px;text-align:left;white-space:nowrap} +th{background:#f0f0f0;cursor:pointer;user-select:none;position:sticky;top:0;z-index:1} +th:hover{background:#e0e0e0} +th .arrow{font-size:0.7em;margin-left:4px} +tr:hover{background:#e8f0fe} +input{padding:6px 10px;margin-bottom:12px;width:300px;border:1px solid #ccc;border-radius:4px} +button{padding:6px 14px;margin-left:8px;margin-bottom:12px;border:1px solid #ccc;border-radius:4px;background:#f0f0f0;cursor:pointer} +button:hover{background:#e0e0e0} +a{color:#1a73e8;text-decoration:none} +a:hover{text-decoration:underline} +.num{text-align:right} +</style></head><body> +<h2>Exec Summary</h2> +<input type="text" id="filter" placeholder="Filter by function name..."><button id="csvBtn">Export CSV</button> +<div id="container"> +<table><thead><tr> +<th data-col="0">LSN <span class="arrow"></span></th> +<th data-col="1">Index <span class="arrow"></span></th> +<th data-col="2">Action ID <span class="arrow"></span></th> +<th data-col="3">Function <span class="arrow"></span></th> +<th data-col="4">In Attachments <span class="arrow"></span></th> +<th data-col="5">In Bytes <span class="arrow"></span></th> +<th data-col="6">Out Attachments <span class="arrow"></span></th> +<th data-col="7">Out Bytes <span class="arrow"></span></th> +<th data-col="8">Wall(s) <span class="arrow"></span></th> +<th data-col="9">CPU(s) <span class="arrow"></span></th> +<th data-col="10">Submitted <span class="arrow"></span></th> +<th data-col="11">Started <span class="arrow"></span></th> +<th data-col="12">Location <span class="arrow"></span></th> +</tr></thead><tbody> +<tr id="spacerTop"><td colspan="13"></td></tr> +<tr id="spacerBot"><td colspan="13"></td></tr> +</tbody></table></div> +<script> +const DATA=[ +)")); + + std::string_view ResultExt = ".result.yaml"; + std::string_view ActionExt = ".action.yaml"; + + for (const ActionSummaryEntry& Entry : Sorted) + { + std::string SafeName = EscapeJson(EscapeHtml(Entry.FunctionName)); + std::string ActionIdStr = Entry.ActionId.ToHexString(); + std::string ActionLink; + if (!ActionExt.empty()) + { + ActionLink = EscapeJson(fmt::format(" <a href=\"{}{}\">[action]</a>", Entry.Lsn, ActionExt)); + } + + // Indices: 0=lsn, 1=idx, 2=actionId, 3=fn, 4=inAtt, 5=inBytes, 6=outAtt, 7=outBytes, + // 8=wall, 9=cpu, 10=niceBytesIn, 11=niceBytesOut, 12=actionLink, + // 13=submittedTicks, 14=startedTicks, 15=submittedDisplay, 16=startedDisplay, + // 17=location + Html.Append( + fmt::format("[{},{},\"{}\",\"{}\",{},{},{},{},{:.6f},{:.6f},\"{}\",\"{}\",\"{}\",{},{},\"{}\",\"{}\",\"{}\"],\n", + Entry.Lsn, + Entry.RecordingIndex, + ActionIdStr, + SafeName, + Entry.InputAttachments, + Entry.InputBytes, + Entry.OutputAttachments, + Entry.OutputBytes, + Entry.WallSeconds, + Entry.CpuSeconds, + EscapeJson(NiceBytes(Entry.InputBytes)), + EscapeJson(NiceBytes(Entry.OutputBytes)), + ActionLink, + Entry.SubmittedTicks, + Entry.StartedTicks, + FormatTimestamp(Entry.SubmittedTicks), + FormatTimestamp(Entry.StartedTicks), + EscapeJson(EscapeHtml(Entry.ExecutionLocation)))); + } + + Html.Append(fmt::format(R"(]; +const RESULT_EXT="{}"; +)", + ResultExt)); + + Html.Append(std::string_view(R"JS((function(){ +const ROW_H=33,BUF=20; +const container=document.getElementById("container"); +const tbody=container.querySelector("tbody"); +const headers=container.querySelectorAll("th"); +const filterInput=document.getElementById("filter"); +const spacerTop=document.getElementById("spacerTop"); +const spacerBot=document.getElementById("spacerBot"); +let view=[...DATA.keys()]; +let sortCol=-1,sortAsc=true; +const COLS=[ + {f:0,t:"n"},{f:1,t:"n"},{f:2,t:"s"},{f:3,t:"s"}, + {f:4,t:"n"},{f:5,t:"n"},{f:6,t:"n"},{f:7,t:"n"}, + {f:8,t:"n"},{f:9,t:"n"},{f:13,t:"n"},{f:14,t:"n"},{f:17,t:"s"} +]; +function rowHtml(i){ + const d=DATA[view[i]]; + const bg=i%2?' style="background:#f9f9f9"':''; + return '<tr'+bg+'>'+ + '<td class="num"><a href="'+d[0]+RESULT_EXT+'">'+d[0]+'</a></td>'+ + '<td class="num">'+d[1]+'</td>'+ + '<td><code>'+d[2]+'</code></td>'+ + '<td>'+d[3]+d[12]+'</td>'+ + '<td class="num">'+d[4]+'</td>'+ + '<td class="num">'+d[10]+'</td>'+ + '<td class="num">'+d[6]+'</td>'+ + '<td class="num">'+d[11]+'</td>'+ + '<td class="num">'+d[8].toFixed(2)+'</td>'+ + '<td class="num">'+d[9].toFixed(2)+'</td>'+ + '<td class="num">'+d[15]+'</td>'+ + '<td class="num">'+d[16]+'</td>'+ + '<td>'+d[17]+'</td></tr>'; +} +let lastFirst=-1,lastLast=-1; +function render(){ + const scrollTop=container.scrollTop; + const viewH=container.clientHeight; + let first=Math.floor(scrollTop/ROW_H)-BUF; + let last=Math.ceil((scrollTop+viewH)/ROW_H)+BUF; + if(first<0) first=0; + if(last>=view.length) last=view.length-1; + if(first===lastFirst&&last===lastLast) return; + lastFirst=first;lastLast=last; + const rows=[]; + for(let i=first;i<=last;i++) rows.push(rowHtml(i)); + spacerTop.style.height=(first*ROW_H)+'px'; + spacerBot.style.height=((view.length-1-last)*ROW_H)+'px'; + const mid=rows.join(''); + const topTr='<tr id="spacerTop"><td colspan="13" style="border:0;padding:0;height:'+spacerTop.style.height+'"></td></tr>'; + const botTr='<tr id="spacerBot"><td colspan="13" style="border:0;padding:0;height:'+spacerBot.style.height+'"></td></tr>'; + tbody.innerHTML=topTr+mid+botTr; +} +function applySort(){ + if(sortCol<0) return; + const c=COLS[sortCol]; + view.sort((a,b)=>{ + const va=DATA[a][c.f],vb=DATA[b][c.f]; + if(c.t==="n") return sortAsc?va-vb:vb-va; + return sortAsc?(va<vb?-1:va>vb?1:0):(va>vb?-1:va<vb?1:0); + }); +} +function rebuild(){ + const q=filterInput.value.toLowerCase(); + view=[]; + for(let i=0;i<DATA.length;i++){ + if(!q||DATA[i][3].toLowerCase().includes(q)) view.push(i); + } + applySort(); + lastFirst=lastLast=-1; + render(); +} +headers.forEach(th=>{ + th.addEventListener("click",()=>{ + const col=parseInt(th.dataset.col); + if(sortCol===col){sortAsc=!sortAsc}else{sortCol=col;sortAsc=true} + headers.forEach(h=>h.querySelector(".arrow").textContent=""); + th.querySelector(".arrow").textContent=sortAsc?"\u25B2":"\u25BC"; + applySort(); + lastFirst=lastLast=-1; + render(); + }); +}); +filterInput.addEventListener("input",()=>rebuild()); +let ticking=false; +container.addEventListener("scroll",()=>{ + if(!ticking){ticking=true;requestAnimationFrame(()=>{render();ticking=false})} +}); +rebuild(); +document.getElementById("csvBtn").addEventListener("click",()=>{ + const H=["LSN","Index","Action ID","Function","In Attachments","In Bytes","Out Attachments","Out Bytes","Wall(s)","CPU(s)","Submitted","Started","Location"]; + const esc=v=>{const s=String(v);return s.includes(',')||s.includes('"')||s.includes('\n')?'"'+s.replace(/"/g,'""')+'"':s}; + const rows=[H.join(",")]; + for(let i=0;i<view.length;i++){ + const d=DATA[view[i]]; + rows.push([d[0],d[1],d[2],d[3],d[4],d[5],d[6],d[7],d[8],d[9],d[15],d[16],d[17]].map(esc).join(",")); + } + const blob=new Blob([rows.join("\n")],{type:"text/csv"}); + const a=document.createElement("a"); + a.href=URL.createObjectURL(blob); + a.download="summary.csv"; + a.click(); + URL.revokeObjectURL(a.href); +}); +})(); +</script></body></html> +)JS")); + + std::filesystem::path HtmlPath = m_OutputPath / "summary.html"; + std::string_view HtmlStr = Html; + zen::WriteFile(HtmlPath, IoBuffer(IoBuffer::Clone, HtmlStr.data(), HtmlStr.size())); + + ZEN_CONSOLE("wrote HTML summary to {}", HtmlPath.string()); + } + } + + if (FailedWorkCounter) + { + return 1; + } + + return 0; +} + +int +ExecCommand::LocalMessagingExecute() +{ + // Non-HTTP work submission path + + // To be reimplemented using final transport + + return 0; +} + +////////////////////////////////////////////////////////////////////////// + +int +ExecCommand::HttpExecute() +{ + ZEN_ASSERT(m_ChunkResolver); + ChunkResolver& Resolver = *m_ChunkResolver; + + std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp"); + + zen::compute::ComputeServiceSession ComputeSession(Resolver); + ComputeSession.AddRemoteRunner(Resolver, TempPath, m_HostName); + + return ExecUsingSession(ComputeSession); +} + +int +ExecCommand::BeaconExecute() +{ + ZEN_ASSERT(m_ChunkResolver); + ChunkResolver& Resolver = *m_ChunkResolver; + std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp"); + + zen::compute::ComputeServiceSession ComputeSession(Resolver); + + if (!m_OrchestratorUrl.empty()) + { + ZEN_CONSOLE_INFO("using orchestrator at {}", m_OrchestratorUrl); + ComputeSession.SetOrchestratorEndpoint(m_OrchestratorUrl); + ComputeSession.SetOrchestratorBasePath(TempPath); + } + else + { + ZEN_CONSOLE_INFO("note: using hard-coded local worker path"); + ComputeSession.AddRemoteRunner(Resolver, TempPath, "http://localhost:8558"); + } + + return ExecUsingSession(ComputeSession); +} + +////////////////////////////////////////////////////////////////////////// + +void +ExecCommand::RegisterWorkerFunctionsFromDescription(const CbObject& WorkerDesc, const IoHash& WorkerId) +{ + const Guid WorkerBuildSystemVersion = WorkerDesc["buildsystem_version"sv].AsUuid(); + + for (auto& Item : WorkerDesc["functions"sv]) + { + CbObjectView Function = Item.AsObjectView(); + + std::string_view FunctionName = Function["name"sv].AsString(); + const Guid FunctionVersion = Function["version"sv].AsUuid(); + + m_FunctionList.emplace_back(FunctionDefinition{.FunctionName = std::string{FunctionName}, + .FunctionVersion = FunctionVersion, + .BuildSystemVersion = WorkerBuildSystemVersion, + .WorkerId = WorkerId}); + } +} + +void +ExecCommand::EmitFunctionListOnce(const std::vector<FunctionDefinition>& FunctionList) +{ + if (m_FunctionListEmittedOnce == false) + { + EmitFunctionList(FunctionList); + + m_FunctionListEmittedOnce = true; + } +} + +int +ExecCommand::DumpWorkItems() +{ + std::atomic<int> EmittedCount{0}; + + eastl::hash_map<IoHash, uint64_t> SeenAttachments; // Attachment CID -> count of references + + m_RecordingReader->IterateActions( + [&](CbObject ActionObject, const IoHash& ActionId) { + eastl::hash_map<IoHash, CompressedBuffer> Attachments; + + uint64_t AttachmentBytes = 0; + uint64_t UncompressedAttachmentBytes = 0; + + ActionObject.IterateAttachments([&](const zen::CbFieldView AttachmentField) { + const IoHash AttachmentCid = AttachmentField.GetValue().AsHash(); + IoBuffer AttachmentData = m_ChunkResolver->FindChunkByCid(AttachmentCid); + IoHash RawHash; + uint64_t RawSize = 0; + CompressedBuffer CompressedData = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentData), RawHash, RawSize); + Attachments[AttachmentCid] = CompressedData; + + AttachmentBytes += CompressedData.GetCompressedSize(); + UncompressedAttachmentBytes += CompressedData.DecodeRawSize(); + + if (auto [Iter, Inserted] = SeenAttachments.insert({AttachmentCid, 1}); !Inserted) + { + ++Iter->second; + } + }); + + zen::ExtendableStringBuilder<1024> ObjStr; + +# if 0 + zen::CompactBinaryToJson(ActionObject, ObjStr); + ZEN_CONSOLE("work item {} ({} attachments): {}", ActionId, Attachments.size(), ObjStr); +# else + zen::CompactBinaryToYaml(ActionObject, ObjStr); + ZEN_CONSOLE("work item {} ({} attachments, {}->{} bytes):\n{}", + ActionId, + Attachments.size(), + AttachmentBytes, + UncompressedAttachmentBytes, + ObjStr); +# endif + + ++EmittedCount; + }, + 1); + + ZEN_CONSOLE("emitted: {} actions", EmittedCount.load()); + + eastl::map<uint64_t, std::vector<IoHash>> ReferenceHistogram; + + for (const auto& [K, V] : SeenAttachments) + { + if (V > 1) + { + ReferenceHistogram[V].push_back(K); + } + } + + for (const auto& [RefCount, Cids] : ReferenceHistogram) + { + ZEN_CONSOLE("{} attachments with {} references", Cids.size(), RefCount); + } + + return 0; +} + +////////////////////////////////////////////////////////////////////////// + +int +ExecCommand::BuildActionsLog() +{ + ZEN_ASSERT(m_ChunkResolver); + ChunkResolver& Resolver = *m_ChunkResolver; + + if (m_RecordingPath.empty()) + { + throw OptionParseException("need to specify recording path", m_Options.help()); + } + + if (std::filesystem::exists(m_RecordingLogPath)) + { + throw OptionParseException(fmt::format("recording log directory '{}' already exists!", m_RecordingLogPath), m_Options.help()); + } + + ZEN_NOT_IMPLEMENTED("build log generation not implemented yet!"); + + std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp"); + + zen::compute::ComputeServiceSession ComputeSession(Resolver); + ComputeSession.StartRecording(Resolver, m_RecordingLogPath); + + return ExecUsingSession(ComputeSession); +} + +void +ExecCommand::EmitFunctionList(const std::vector<FunctionDefinition>& FunctionList) +{ + ZEN_CONSOLE("=== Known functions:\n==========================="); + + ZEN_CONSOLE("{:30} {:36} {:36} {}", "function", "version", "build system", "worker id"); + + for (const FunctionDefinition& Func : FunctionList) + { + ZEN_CONSOLE("{:30} {:36} {:36} {}", Func.FunctionName, Func.FunctionVersion, Func.BuildSystemVersion, Func.WorkerId); + } + + ZEN_CONSOLE("==========================="); +} + +} // namespace zen + +#endif // ZEN_WITH_COMPUTE_SERVICES |