diff options
| author | Stefan Boberg <[email protected]> | 2026-03-04 14:13:46 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-04 14:13:46 +0100 |
| commit | 0763d09a81e5a1d3df11763a7ec75e7860c9510a (patch) | |
| tree | 074575ba6ea259044a179eab0bb396d37268fb09 /src/zen/cmds/exec_cmd.cpp | |
| parent | native xmake toolchain definition for UE-clang (#805) (diff) | |
| download | archived-zen-0763d09a81e5a1d3df11763a7ec75e7860c9510a.tar.xz archived-zen-0763d09a81e5a1d3df11763a7ec75e7860c9510a.zip | |
compute orchestration (#763)
- Added local process runners for Linux/Wine, Mac with some sandboxing support
- Horde & Nomad provisioning for development and testing
- Client session queues with lifecycle management (active/draining/cancelled), automatic retry with configurable limits, and manual reschedule API
- Improved web UI for orchestrator, compute, and hub dashboards with WebSocket push updates
- Some security hardening
- Improved scalability and `zen exec` command
Still experimental - compute support is disabled by default
Diffstat (limited to 'src/zen/cmds/exec_cmd.cpp')
| -rw-r--r-- | src/zen/cmds/exec_cmd.cpp | 753 |
1 files changed, 733 insertions, 20 deletions
diff --git a/src/zen/cmds/exec_cmd.cpp b/src/zen/cmds/exec_cmd.cpp index 407f42ee3..42c7119e7 100644 --- a/src/zen/cmds/exec_cmd.cpp +++ b/src/zen/cmds/exec_cmd.cpp @@ -2,7 +2,7 @@ #include "exec_cmd.h" -#include <zencompute/functionservice.h> +#include <zencompute/computeservice.h> #include <zencompute/recordingreader.h> #include <zencore/compactbinary.h> #include <zencore/compactbinarybuilder.h> @@ -14,9 +14,13 @@ #include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/scopeguard.h> +#include <zencore/session.h> #include <zencore/stream.h> #include <zencore/string.h> +#include <zencore/system.h> #include <zencore/timer.h> +#include <zenhttp/httpclient.h> +#include <zenhttp/packageformat.h> #include <EASTL/hash_map.h> #include <EASTL/hash_set.h> @@ -47,12 +51,17 @@ ExecCommand::ExecCommand() m_Options.add_option("", "", "stride", "Recording replay stride", cxxopts::value(m_Stride), "<stride>"); m_Options.add_option("", "", "limit", "Recording replay limit", cxxopts::value(m_Limit), "<limit>"); m_Options.add_option("", "", "beacon", "Beacon path", cxxopts::value(m_BeaconPath), "<path>"); + m_Options.add_option("", "", "orch", "Orchestrator URL for worker discovery", cxxopts::value(m_OrchestratorUrl), "<url>"); m_Options.add_option("", "", "mode", "Select execution mode (http,inproc,dump,direct,beacon,buildlog)", cxxopts::value(m_Mode)->default_value("http"), "<string>"); + m_Options + .add_option("", "", "dump-actions", "Dump each action to console as it is dispatched", cxxopts::value(m_DumpActions), "<bool>"); + m_Options.add_option("", "o", "output", "Save action results to directory", cxxopts::value(m_OutputPath), "<path>"); + m_Options.add_option("", "", "binary", "Write output as binary packages instead of YAML", cxxopts::value(m_Binary), "<bool>"); m_Options.add_option("", "", "quiet", "Quiet mode (less logging)", cxxopts::value(m_Quiet), "<bool>"); m_Options.parse_positional("mode"); } @@ -236,16 +245,16 @@ ExecCommand::InProcessExecute() ZEN_ASSERT(m_ChunkResolver); ChunkResolver& Resolver = *m_ChunkResolver; - zen::compute::FunctionServiceSession FunctionSession(Resolver); + zen::compute::ComputeServiceSession ComputeSession(Resolver); std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp"); - FunctionSession.AddLocalRunner(Resolver, TempPath); + ComputeSession.AddLocalRunner(Resolver, TempPath); - return ExecUsingSession(FunctionSession); + return ExecUsingSession(ComputeSession); } int -ExecCommand::ExecUsingSession(zen::compute::FunctionServiceSession& FunctionSession) +ExecCommand::ExecUsingSession(zen::compute::ComputeServiceSession& ComputeSession) { struct JobTracker { @@ -281,6 +290,117 @@ ExecCommand::ExecUsingSession(zen::compute::FunctionServiceSession& FunctionSess JobTracker PendingJobs; + struct ActionSummaryEntry + { + int32_t Lsn = 0; + int RecordingIndex = 0; + IoHash ActionId; + std::string FunctionName; + int InputAttachments = 0; + uint64_t InputBytes = 0; + int OutputAttachments = 0; + uint64_t OutputBytes = 0; + float WallSeconds = 0.0f; + float CpuSeconds = 0.0f; + uint64_t SubmittedTicks = 0; + uint64_t StartedTicks = 0; + std::string ExecutionLocation; + }; + + std::mutex SummaryLock; + std::unordered_map<int32_t, ActionSummaryEntry> SummaryEntries; + + ComputeSession.WaitUntilReady(); + + // Register as a client with the orchestrator (best-effort) + + std::string OrchestratorClientId; + + if (!m_OrchestratorUrl.empty()) + { + try + { + HttpClient OrchestratorClient(m_OrchestratorUrl); + + CbObjectWriter Ann; + Ann << "session_id"sv << GetSessionId(); + Ann << "hostname"sv << std::string_view(GetMachineName()); + + CbObjectWriter Meta; + Meta << "source"sv + << "zen-exec"sv; + Ann << "metadata"sv << Meta.Save(); + + auto Resp = OrchestratorClient.Post("/orch/clients", Ann.Save()); + if (Resp.IsSuccess()) + { + OrchestratorClientId = std::string(Resp.AsObject()["id"].AsString()); + ZEN_CONSOLE_INFO("registered with orchestrator as {}", OrchestratorClientId); + } + else + { + ZEN_WARN("failed to register with orchestrator (status {})", static_cast<int>(Resp.StatusCode)); + } + } + catch (const std::exception& Ex) + { + ZEN_WARN("failed to register with orchestrator: {}", Ex.what()); + } + } + + Stopwatch OrchestratorHeartbeatTimer; + + auto SendOrchestratorHeartbeat = [&] { + if (OrchestratorClientId.empty() || OrchestratorHeartbeatTimer.GetElapsedTimeMs() < 30'000) + { + return; + } + OrchestratorHeartbeatTimer.Reset(); + try + { + HttpClient OrchestratorClient(m_OrchestratorUrl); + std::ignore = OrchestratorClient.Post(fmt::format("/orch/clients/{}/update", OrchestratorClientId)); + } + catch (...) + { + } + }; + + auto ClientCleanup = MakeGuard([&] { + if (!OrchestratorClientId.empty()) + { + try + { + HttpClient OrchestratorClient(m_OrchestratorUrl); + std::ignore = OrchestratorClient.Post(fmt::format("/orch/clients/{}/complete", OrchestratorClientId)); + } + catch (...) + { + } + } + }); + + // Create a queue to group all actions from this exec session + + CbObjectWriter Metadata; + Metadata << "source"sv + << "zen-exec"sv; + + auto QueueResult = ComputeSession.CreateQueue("zen-exec", Metadata.Save()); + const int QueueId = QueueResult.QueueId; + if (!QueueId) + { + ZEN_ERROR("failed to create compute queue"); + return 1; + } + + auto QueueCleanup = MakeGuard([&] { ComputeSession.DeleteQueue(QueueId); }); + + if (!m_OutputPath.empty()) + { + zen::CreateDirectories(m_OutputPath); + } + std::atomic<int> IsDraining{0}; auto DrainCompletedJobs = [&] { @@ -292,7 +412,7 @@ ExecCommand::ExecUsingSession(zen::compute::FunctionServiceSession& FunctionSess auto _ = MakeGuard([&] { IsDraining.store(0, std::memory_order_release); }); CbObjectWriter Cbo; - FunctionSession.GetCompleted(Cbo); + ComputeSession.GetQueueCompleted(QueueId, Cbo); if (CbObject Completed = Cbo.Save()) { @@ -301,10 +421,89 @@ ExecCommand::ExecUsingSession(zen::compute::FunctionServiceSession& FunctionSess int32_t CompleteLsn = It.AsInt32(); CbPackage ResultPackage; - HttpResponseCode Response = FunctionSession.GetActionResult(CompleteLsn, /* out */ ResultPackage); + HttpResponseCode Response = ComputeSession.GetActionResult(CompleteLsn, /* out */ ResultPackage); if (Response == HttpResponseCode::OK) { + if (!m_OutputPath.empty() && ResultPackage) + { + int OutputAttachments = 0; + uint64_t OutputBytes = 0; + + if (!m_Binary) + { + // Write the root object as YAML + ExtendableStringBuilder<4096> YamlStr; + CompactBinaryToYaml(ResultPackage.GetObject(), YamlStr); + + std::string_view Yaml = YamlStr; + zen::WriteFile(m_OutputPath / fmt::format("{}.result.yaml", CompleteLsn), + IoBuffer(IoBuffer::Clone, Yaml.data(), Yaml.size())); + + // Write decompressed attachments + auto Attachments = ResultPackage.GetAttachments(); + + if (!Attachments.empty()) + { + std::filesystem::path AttDir = m_OutputPath / fmt::format("{}.result.attachments", CompleteLsn); + zen::CreateDirectories(AttDir); + + for (const CbAttachment& Att : Attachments) + { + ++OutputAttachments; + + IoHash AttHash = Att.GetHash(); + + if (Att.IsCompressedBinary()) + { + SharedBuffer Decompressed = Att.AsCompressedBinary().Decompress(); + OutputBytes += Decompressed.GetSize(); + zen::WriteFile(AttDir / AttHash.ToHexString(), + IoBuffer(IoBuffer::Clone, Decompressed.GetData(), Decompressed.GetSize())); + } + else + { + SharedBuffer Binary = Att.AsBinary(); + OutputBytes += Binary.GetSize(); + zen::WriteFile(AttDir / AttHash.ToHexString(), + IoBuffer(IoBuffer::Clone, Binary.GetData(), Binary.GetSize())); + } + } + } + + if (!m_QuietLogging) + { + ZEN_CONSOLE("saved result: {}/{}.result.yaml ({} attachments)", + m_OutputPath.string(), + CompleteLsn, + OutputAttachments); + } + } + else + { + CompositeBuffer Serialized = FormatPackageMessageBuffer(ResultPackage); + zen::WriteFile(m_OutputPath / fmt::format("{}.result.pkg", CompleteLsn), std::move(Serialized)); + + for (const CbAttachment& Att : ResultPackage.GetAttachments()) + { + ++OutputAttachments; + OutputBytes += Att.AsBinary().GetSize(); + } + + if (!m_QuietLogging) + { + ZEN_CONSOLE("saved result: {}/{}.result.pkg", m_OutputPath.string(), CompleteLsn); + } + } + + std::lock_guard Lock(SummaryLock); + if (auto It2 = SummaryEntries.find(CompleteLsn); It2 != SummaryEntries.end()) + { + It2->second.OutputAttachments = OutputAttachments; + It2->second.OutputBytes = OutputBytes; + } + } + PendingJobs.Remove(CompleteLsn); ZEN_CONSOLE("completed: LSN {} ({} still pending)", CompleteLsn, PendingJobs.GetSize()); @@ -321,7 +520,7 @@ ExecCommand::ExecUsingSession(zen::compute::FunctionServiceSession& FunctionSess { CbPackage WorkerDesc = Kv.second; - FunctionSession.RegisterWorker(WorkerDesc); + ComputeSession.RegisterWorker(WorkerDesc); } // Then submit work items @@ -367,10 +566,14 @@ ExecCommand::ExecUsingSession(zen::compute::FunctionServiceSession& FunctionSess TargetParallelism = 1; } + std::atomic<int> RecordingIndex{0}; + m_RecordingReader->IterateActions( [&](CbObject ActionObject, const IoHash& ActionId) { // Enqueue job + const int CurrentRecordingIndex = RecordingIndex++; + Stopwatch SubmitTimer; const int Priority = 0; @@ -404,8 +607,29 @@ ExecCommand::ExecUsingSession(zen::compute::FunctionServiceSession& FunctionSess ObjStr); } - if (zen::compute::FunctionServiceSession::EnqueueResult EnqueueResult = - FunctionSession.EnqueueAction(ActionObject, Priority)) + if (m_DumpActions) + { + int AttachmentCount = 0; + uint64_t AttachmentBytes = 0; + + ActionObject.IterateAttachments([&](CbFieldView Field) { + IoHash AttachData = Field.AsAttachment(); + + ++AttachmentCount; + + if (IoBuffer ChunkData = m_ChunkResolver->FindChunkByCid(AttachData)) + { + AttachmentBytes += ChunkData.GetSize(); + } + }); + + zen::ExtendableStringBuilder<1024> ObjStr; + zen::CompactBinaryToYaml(ActionObject, ObjStr); + ZEN_CONSOLE("action {} ({} attachments, {}):\n{}", ActionId, AttachmentCount, NiceBytes(AttachmentBytes), ObjStr); + } + + if (zen::compute::ComputeServiceSession::EnqueueResult EnqueueResult = + ComputeSession.EnqueueActionToQueue(QueueId, ActionObject, Priority)) { const int32_t LsnField = EnqueueResult.Lsn; @@ -421,6 +645,96 @@ ExecCommand::ExecUsingSession(zen::compute::FunctionServiceSession& FunctionSess RemainingWorkItems); } + if (!m_OutputPath.empty()) + { + ActionSummaryEntry Entry; + Entry.Lsn = LsnField; + Entry.RecordingIndex = CurrentRecordingIndex; + Entry.ActionId = ActionId; + Entry.FunctionName = std::string(ActionObject["Function"sv].AsString()); + + if (!m_Binary) + { + // Write action object as YAML + ExtendableStringBuilder<4096> YamlStr; + CompactBinaryToYaml(ActionObject, YamlStr); + + std::string_view Yaml = YamlStr; + zen::WriteFile(m_OutputPath / fmt::format("{}.action.yaml", LsnField), + IoBuffer(IoBuffer::Clone, Yaml.data(), Yaml.size())); + + // Write decompressed input attachments + std::filesystem::path AttDir = m_OutputPath / fmt::format("{}.action.attachments", LsnField); + bool AttDirCreated = false; + + ActionObject.IterateAttachments([&](CbFieldView Field) { + IoHash AttachCid = Field.AsAttachment(); + ++Entry.InputAttachments; + + if (IoBuffer ChunkData = m_ChunkResolver->FindChunkByCid(AttachCid)) + { + IoHash RawHash; + uint64_t RawSize = 0; + CompressedBuffer Compressed = + CompressedBuffer::FromCompressed(SharedBuffer(ChunkData), RawHash, RawSize); + SharedBuffer Decompressed = Compressed.Decompress(); + + Entry.InputBytes += Decompressed.GetSize(); + + if (!AttDirCreated) + { + zen::CreateDirectories(AttDir); + AttDirCreated = true; + } + + zen::WriteFile(AttDir / AttachCid.ToHexString(), + IoBuffer(IoBuffer::Clone, Decompressed.GetData(), Decompressed.GetSize())); + } + }); + + if (!m_QuietLogging) + { + ZEN_CONSOLE("saved action: {}/{}.action.yaml ({} attachments)", + m_OutputPath.string(), + LsnField, + Entry.InputAttachments); + } + } + else + { + // Build a CbPackage from the action and write as .pkg + CbPackage ActionPackage; + ActionPackage.SetObject(ActionObject); + + ActionObject.IterateAttachments([&](CbFieldView Field) { + IoHash AttachCid = Field.AsAttachment(); + ++Entry.InputAttachments; + + if (IoBuffer ChunkData = m_ChunkResolver->FindChunkByCid(AttachCid)) + { + IoHash RawHash; + uint64_t RawSize = 0; + CompressedBuffer Compressed = + CompressedBuffer::FromCompressed(SharedBuffer(ChunkData), RawHash, RawSize); + + Entry.InputBytes += ChunkData.GetSize(); + ActionPackage.AddAttachment(CbAttachment(std::move(Compressed), RawHash)); + } + }); + + CompositeBuffer Serialized = FormatPackageMessageBuffer(ActionPackage); + zen::WriteFile(m_OutputPath / fmt::format("{}.action.pkg", LsnField), std::move(Serialized)); + + if (!m_QuietLogging) + { + ZEN_CONSOLE("saved action: {}/{}.action.pkg", m_OutputPath.string(), LsnField); + } + } + + std::lock_guard Lock(SummaryLock); + SummaryEntries.emplace(LsnField, std::move(Entry)); + } + PendingJobs.Insert(LsnField); } else @@ -450,6 +764,7 @@ ExecCommand::ExecUsingSession(zen::compute::FunctionServiceSession& FunctionSess // Check for completed work DrainCompletedJobs(); + SendOrchestratorHeartbeat(); }, TargetParallelism); @@ -461,6 +776,394 @@ ExecCommand::ExecUsingSession(zen::compute::FunctionServiceSession& FunctionSess zen::Sleep(500); DrainCompletedJobs(); + SendOrchestratorHeartbeat(); + } + + // Merge timing data from queue history into summary entries + + if (!SummaryEntries.empty()) + { + // RunnerAction::State indices (can't include functionrunner.h from here) + constexpr int kStateNew = 0; + constexpr int kStatePending = 1; + constexpr int kStateRunning = 3; + constexpr int kStateCompleted = 4; // first terminal state + constexpr int kStateCount = 8; + + for (const auto& HistEntry : ComputeSession.GetQueueHistory(QueueId, 0)) + { + std::lock_guard Lock(SummaryLock); + if (auto It = SummaryEntries.find(HistEntry.Lsn); It != SummaryEntries.end()) + { + // Find terminal state timestamp (Completed, Failed, Abandoned, or Cancelled) + uint64_t EndTick = 0; + for (int S = kStateCompleted; S < kStateCount; ++S) + { + if (HistEntry.Timestamps[S] != 0) + { + EndTick = HistEntry.Timestamps[S]; + break; + } + } + uint64_t StartTick = HistEntry.Timestamps[kStateNew]; + if (EndTick > StartTick) + { + It->second.WallSeconds = float(double(EndTick - StartTick) / double(TimeSpan::TicksPerSecond)); + } + It->second.CpuSeconds = HistEntry.CpuSeconds; + It->second.SubmittedTicks = HistEntry.Timestamps[kStatePending]; + It->second.StartedTicks = HistEntry.Timestamps[kStateRunning]; + It->second.ExecutionLocation = HistEntry.ExecutionLocation; + } + } + } + + // Write summary file if output path is set + + if (!m_OutputPath.empty() && !SummaryEntries.empty()) + { + std::vector<ActionSummaryEntry> Sorted; + Sorted.reserve(SummaryEntries.size()); + for (auto& [_, Entry] : SummaryEntries) + { + Sorted.push_back(std::move(Entry)); + } + + std::sort(Sorted.begin(), Sorted.end(), [](const ActionSummaryEntry& A, const ActionSummaryEntry& B) { + return A.RecordingIndex < B.RecordingIndex; + }); + + auto FormatTimestamp = [](uint64_t Ticks) -> std::string { + if (Ticks == 0) + { + return "-"; + } + return DateTime(Ticks).ToString("%H:%M:%S.%s"); + }; + + ExtendableStringBuilder<4096> Summary; + Summary.Append(fmt::format("{:<8} {:<8} {:<40} {:<40} {:>8} {:>12} {:>8} {:>12} {:>8} {:>8} {:>12} {:>12} {:<24}\n", + "LSN", + "Index", + "ActionId", + "Function", + "InAtt", + "InBytes", + "OutAtt", + "OutBytes", + "Wall(s)", + "CPU(s)", + "Submitted", + "Started", + "Location")); + Summary.Append(fmt::format("{:-<8} {:-<8} {:-<40} {:-<40} {:-<8} {:-<12} {:-<8} {:-<12} {:-<8} {:-<8} {:-<12} {:-<12} {:-<24}\n", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "")); + + for (const ActionSummaryEntry& Entry : Sorted) + { + Summary.Append(fmt::format("{:<8} {:<8} {:<40} {:<40} {:>8} {:>12} {:>8} {:>12} {:>8.2f} {:>8.2f} {:>12} {:>12} {:<24}\n", + Entry.Lsn, + Entry.RecordingIndex, + Entry.ActionId, + Entry.FunctionName, + Entry.InputAttachments, + NiceBytes(Entry.InputBytes), + Entry.OutputAttachments, + NiceBytes(Entry.OutputBytes), + Entry.WallSeconds, + Entry.CpuSeconds, + FormatTimestamp(Entry.SubmittedTicks), + FormatTimestamp(Entry.StartedTicks), + Entry.ExecutionLocation)); + } + + std::filesystem::path SummaryPath = m_OutputPath / "summary.txt"; + std::string_view SummaryStr = Summary; + zen::WriteFile(SummaryPath, IoBuffer(IoBuffer::Clone, SummaryStr.data(), SummaryStr.size())); + + ZEN_CONSOLE("wrote summary to {}", SummaryPath.string()); + + if (!m_Binary) + { + auto EscapeHtml = [](std::string_view Input) -> std::string { + std::string Out; + Out.reserve(Input.size()); + for (char C : Input) + { + switch (C) + { + case '&': + Out += "&"; + break; + case '<': + Out += "<"; + break; + case '>': + Out += ">"; + break; + case '"': + Out += """; + break; + case '\'': + Out += "'"; + break; + default: + Out += C; + } + } + return Out; + }; + + auto EscapeJson = [](std::string_view Input) -> std::string { + std::string Out; + Out.reserve(Input.size()); + for (char C : Input) + { + switch (C) + { + case '"': + Out += "\\\""; + break; + case '\\': + Out += "\\\\"; + break; + case '\n': + Out += "\\n"; + break; + case '\r': + Out += "\\r"; + break; + case '\t': + Out += "\\t"; + break; + default: + if (static_cast<unsigned char>(C) < 0x20) + { + Out += fmt::format("\\u{:04x}", static_cast<unsigned>(static_cast<unsigned char>(C))); + } + else + { + Out += C; + } + } + } + return Out; + }; + + ExtendableStringBuilder<8192> Html; + + Html.Append(std::string_view(R"(<!DOCTYPE html> +<html><head><meta charset="utf-8"><title>Exec Summary</title> +<style> +body{font-family:system-ui,sans-serif;margin:20px;background:#fafafa} +#container{overflow-y:auto;height:calc(100vh - 120px)} +table{border-collapse:collapse;width:100%} +th,td{border:1px solid #ddd;padding:6px 10px;text-align:left;white-space:nowrap} +th{background:#f0f0f0;cursor:pointer;user-select:none;position:sticky;top:0;z-index:1} +th:hover{background:#e0e0e0} +th .arrow{font-size:0.7em;margin-left:4px} +tr:hover{background:#e8f0fe} +input{padding:6px 10px;margin-bottom:12px;width:300px;border:1px solid #ccc;border-radius:4px} +button{padding:6px 14px;margin-left:8px;margin-bottom:12px;border:1px solid #ccc;border-radius:4px;background:#f0f0f0;cursor:pointer} +button:hover{background:#e0e0e0} +a{color:#1a73e8;text-decoration:none} +a:hover{text-decoration:underline} +.num{text-align:right} +</style></head><body> +<h2>Exec Summary</h2> +<input type="text" id="filter" placeholder="Filter by function name..."><button id="csvBtn">Export CSV</button> +<div id="container"> +<table><thead><tr> +<th data-col="0">LSN <span class="arrow"></span></th> +<th data-col="1">Index <span class="arrow"></span></th> +<th data-col="2">Action ID <span class="arrow"></span></th> +<th data-col="3">Function <span class="arrow"></span></th> +<th data-col="4">In Attachments <span class="arrow"></span></th> +<th data-col="5">In Bytes <span class="arrow"></span></th> +<th data-col="6">Out Attachments <span class="arrow"></span></th> +<th data-col="7">Out Bytes <span class="arrow"></span></th> +<th data-col="8">Wall(s) <span class="arrow"></span></th> +<th data-col="9">CPU(s) <span class="arrow"></span></th> +<th data-col="10">Submitted <span class="arrow"></span></th> +<th data-col="11">Started <span class="arrow"></span></th> +<th data-col="12">Location <span class="arrow"></span></th> +</tr></thead><tbody> +<tr id="spacerTop"><td colspan="13"></td></tr> +<tr id="spacerBot"><td colspan="13"></td></tr> +</tbody></table></div> +<script> +const DATA=[ +)")); + + std::string_view ResultExt = ".result.yaml"; + std::string_view ActionExt = ".action.yaml"; + + for (const ActionSummaryEntry& Entry : Sorted) + { + std::string SafeName = EscapeJson(EscapeHtml(Entry.FunctionName)); + std::string ActionIdStr = Entry.ActionId.ToHexString(); + std::string ActionLink; + if (!ActionExt.empty()) + { + ActionLink = EscapeJson(fmt::format(" <a href=\"{}{}\">[action]</a>", Entry.Lsn, ActionExt)); + } + + // Indices: 0=lsn, 1=idx, 2=actionId, 3=fn, 4=inAtt, 5=inBytes, 6=outAtt, 7=outBytes, + // 8=wall, 9=cpu, 10=niceBytesIn, 11=niceBytesOut, 12=actionLink, + // 13=submittedTicks, 14=startedTicks, 15=submittedDisplay, 16=startedDisplay, + // 17=location + Html.Append( + fmt::format("[{},{},\"{}\",\"{}\",{},{},{},{},{:.6f},{:.6f},\"{}\",\"{}\",\"{}\",{},{},\"{}\",\"{}\",\"{}\"],\n", + Entry.Lsn, + Entry.RecordingIndex, + ActionIdStr, + SafeName, + Entry.InputAttachments, + Entry.InputBytes, + Entry.OutputAttachments, + Entry.OutputBytes, + Entry.WallSeconds, + Entry.CpuSeconds, + EscapeJson(NiceBytes(Entry.InputBytes)), + EscapeJson(NiceBytes(Entry.OutputBytes)), + ActionLink, + Entry.SubmittedTicks, + Entry.StartedTicks, + FormatTimestamp(Entry.SubmittedTicks), + FormatTimestamp(Entry.StartedTicks), + EscapeJson(EscapeHtml(Entry.ExecutionLocation)))); + } + + Html.Append(fmt::format(R"(]; +const RESULT_EXT="{}"; +)", + ResultExt)); + + Html.Append(std::string_view(R"JS((function(){ +const ROW_H=33,BUF=20; +const container=document.getElementById("container"); +const tbody=container.querySelector("tbody"); +const headers=container.querySelectorAll("th"); +const filterInput=document.getElementById("filter"); +const spacerTop=document.getElementById("spacerTop"); +const spacerBot=document.getElementById("spacerBot"); +let view=[...DATA.keys()]; +let sortCol=-1,sortAsc=true; +const COLS=[ + {f:0,t:"n"},{f:1,t:"n"},{f:2,t:"s"},{f:3,t:"s"}, + {f:4,t:"n"},{f:5,t:"n"},{f:6,t:"n"},{f:7,t:"n"}, + {f:8,t:"n"},{f:9,t:"n"},{f:13,t:"n"},{f:14,t:"n"},{f:17,t:"s"} +]; +function rowHtml(i){ + const d=DATA[view[i]]; + const bg=i%2?' style="background:#f9f9f9"':''; + return '<tr'+bg+'>'+ + '<td class="num"><a href="'+d[0]+RESULT_EXT+'">'+d[0]+'</a></td>'+ + '<td class="num">'+d[1]+'</td>'+ + '<td><code>'+d[2]+'</code></td>'+ + '<td>'+d[3]+d[12]+'</td>'+ + '<td class="num">'+d[4]+'</td>'+ + '<td class="num">'+d[10]+'</td>'+ + '<td class="num">'+d[6]+'</td>'+ + '<td class="num">'+d[11]+'</td>'+ + '<td class="num">'+d[8].toFixed(2)+'</td>'+ + '<td class="num">'+d[9].toFixed(2)+'</td>'+ + '<td class="num">'+d[15]+'</td>'+ + '<td class="num">'+d[16]+'</td>'+ + '<td>'+d[17]+'</td></tr>'; +} +let lastFirst=-1,lastLast=-1; +function render(){ + const scrollTop=container.scrollTop; + const viewH=container.clientHeight; + let first=Math.floor(scrollTop/ROW_H)-BUF; + let last=Math.ceil((scrollTop+viewH)/ROW_H)+BUF; + if(first<0) first=0; + if(last>=view.length) last=view.length-1; + if(first===lastFirst&&last===lastLast) return; + lastFirst=first;lastLast=last; + const rows=[]; + for(let i=first;i<=last;i++) rows.push(rowHtml(i)); + spacerTop.style.height=(first*ROW_H)+'px'; + spacerBot.style.height=((view.length-1-last)*ROW_H)+'px'; + const mid=rows.join(''); + const topTr='<tr id="spacerTop"><td colspan="13" style="border:0;padding:0;height:'+spacerTop.style.height+'"></td></tr>'; + const botTr='<tr id="spacerBot"><td colspan="13" style="border:0;padding:0;height:'+spacerBot.style.height+'"></td></tr>'; + tbody.innerHTML=topTr+mid+botTr; +} +function applySort(){ + if(sortCol<0) return; + const c=COLS[sortCol]; + view.sort((a,b)=>{ + const va=DATA[a][c.f],vb=DATA[b][c.f]; + if(c.t==="n") return sortAsc?va-vb:vb-va; + return sortAsc?(va<vb?-1:va>vb?1:0):(va>vb?-1:va<vb?1:0); + }); +} +function rebuild(){ + const q=filterInput.value.toLowerCase(); + view=[]; + for(let i=0;i<DATA.length;i++){ + if(!q||DATA[i][3].toLowerCase().includes(q)) view.push(i); + } + applySort(); + lastFirst=lastLast=-1; + render(); +} +headers.forEach(th=>{ + th.addEventListener("click",()=>{ + const col=parseInt(th.dataset.col); + if(sortCol===col){sortAsc=!sortAsc}else{sortCol=col;sortAsc=true} + headers.forEach(h=>h.querySelector(".arrow").textContent=""); + th.querySelector(".arrow").textContent=sortAsc?"\u25B2":"\u25BC"; + applySort(); + lastFirst=lastLast=-1; + render(); + }); +}); +filterInput.addEventListener("input",()=>rebuild()); +let ticking=false; +container.addEventListener("scroll",()=>{ + if(!ticking){ticking=true;requestAnimationFrame(()=>{render();ticking=false})} +}); +rebuild(); +document.getElementById("csvBtn").addEventListener("click",()=>{ + const H=["LSN","Index","Action ID","Function","In Attachments","In Bytes","Out Attachments","Out Bytes","Wall(s)","CPU(s)","Submitted","Started","Location"]; + const esc=v=>{const s=String(v);return s.includes(',')||s.includes('"')||s.includes('\n')?'"'+s.replace(/"/g,'""')+'"':s}; + const rows=[H.join(",")]; + for(let i=0;i<view.length;i++){ + const d=DATA[view[i]]; + rows.push([d[0],d[1],d[2],d[3],d[4],d[5],d[6],d[7],d[8],d[9],d[15],d[16],d[17]].map(esc).join(",")); + } + const blob=new Blob([rows.join("\n")],{type:"text/csv"}); + const a=document.createElement("a"); + a.href=URL.createObjectURL(blob); + a.download="summary.csv"; + a.click(); + URL.revokeObjectURL(a.href); +}); +})(); +</script></body></html> +)JS")); + + std::filesystem::path HtmlPath = m_OutputPath / "summary.html"; + std::string_view HtmlStr = Html; + zen::WriteFile(HtmlPath, IoBuffer(IoBuffer::Clone, HtmlStr.data(), HtmlStr.size())); + + ZEN_CONSOLE("wrote HTML summary to {}", HtmlPath.string()); + } } if (FailedWorkCounter) @@ -491,10 +1194,10 @@ ExecCommand::HttpExecute() std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp"); - zen::compute::FunctionServiceSession FunctionSession(Resolver); - FunctionSession.AddRemoteRunner(Resolver, TempPath, m_HostName); + zen::compute::ComputeServiceSession ComputeSession(Resolver); + ComputeSession.AddRemoteRunner(Resolver, TempPath, m_HostName); - return ExecUsingSession(FunctionSession); + return ExecUsingSession(ComputeSession); } int @@ -504,11 +1207,21 @@ ExecCommand::BeaconExecute() ChunkResolver& Resolver = *m_ChunkResolver; std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp"); - zen::compute::FunctionServiceSession FunctionSession(Resolver); - FunctionSession.AddRemoteRunner(Resolver, TempPath, "http://localhost:8558"); - // FunctionSession.AddRemoteRunner(Resolver, TempPath, "http://10.99.9.246:8558"); + zen::compute::ComputeServiceSession ComputeSession(Resolver); + + if (!m_OrchestratorUrl.empty()) + { + ZEN_CONSOLE_INFO("using orchestrator at {}", m_OrchestratorUrl); + ComputeSession.SetOrchestratorEndpoint(m_OrchestratorUrl); + ComputeSession.SetOrchestratorBasePath(TempPath); + } + else + { + ZEN_CONSOLE_INFO("note: using hard-coded local worker path"); + ComputeSession.AddRemoteRunner(Resolver, TempPath, "http://localhost:8558"); + } - return ExecUsingSession(FunctionSession); + return ExecUsingSession(ComputeSession); } ////////////////////////////////////////////////////////////////////////// @@ -635,10 +1348,10 @@ ExecCommand::BuildActionsLog() std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp"); - zen::compute::FunctionServiceSession FunctionSession(Resolver); - FunctionSession.StartRecording(Resolver, m_RecordingLogPath); + zen::compute::ComputeServiceSession ComputeSession(Resolver); + ComputeSession.StartRecording(Resolver, m_RecordingLogPath); - return ExecUsingSession(FunctionSession); + return ExecUsingSession(ComputeSession); } void |