aboutsummaryrefslogtreecommitdiff
path: root/src/zen/cmds/exec_cmd.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-03-04 14:13:46 +0100
committerGitHub Enterprise <[email protected]>2026-03-04 14:13:46 +0100
commit0763d09a81e5a1d3df11763a7ec75e7860c9510a (patch)
tree074575ba6ea259044a179eab0bb396d37268fb09 /src/zen/cmds/exec_cmd.cpp
parentnative xmake toolchain definition for UE-clang (#805) (diff)
downloadarchived-zen-0763d09a81e5a1d3df11763a7ec75e7860c9510a.tar.xz
archived-zen-0763d09a81e5a1d3df11763a7ec75e7860c9510a.zip
compute orchestration (#763)
- Added local process runners for Linux/Wine, Mac with some sandboxing support - Horde & Nomad provisioning for development and testing - Client session queues with lifecycle management (active/draining/cancelled), automatic retry with configurable limits, and manual reschedule API - Improved web UI for orchestrator, compute, and hub dashboards with WebSocket push updates - Some security hardening - Improved scalability and `zen exec` command Still experimental - compute support is disabled by default
Diffstat (limited to 'src/zen/cmds/exec_cmd.cpp')
-rw-r--r--src/zen/cmds/exec_cmd.cpp753
1 files changed, 733 insertions, 20 deletions
diff --git a/src/zen/cmds/exec_cmd.cpp b/src/zen/cmds/exec_cmd.cpp
index 407f42ee3..42c7119e7 100644
--- a/src/zen/cmds/exec_cmd.cpp
+++ b/src/zen/cmds/exec_cmd.cpp
@@ -2,7 +2,7 @@
#include "exec_cmd.h"
-#include <zencompute/functionservice.h>
+#include <zencompute/computeservice.h>
#include <zencompute/recordingreader.h>
#include <zencore/compactbinary.h>
#include <zencore/compactbinarybuilder.h>
@@ -14,9 +14,13 @@
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zencore/scopeguard.h>
+#include <zencore/session.h>
#include <zencore/stream.h>
#include <zencore/string.h>
+#include <zencore/system.h>
#include <zencore/timer.h>
+#include <zenhttp/httpclient.h>
+#include <zenhttp/packageformat.h>
#include <EASTL/hash_map.h>
#include <EASTL/hash_set.h>
@@ -47,12 +51,17 @@ ExecCommand::ExecCommand()
m_Options.add_option("", "", "stride", "Recording replay stride", cxxopts::value(m_Stride), "<stride>");
m_Options.add_option("", "", "limit", "Recording replay limit", cxxopts::value(m_Limit), "<limit>");
m_Options.add_option("", "", "beacon", "Beacon path", cxxopts::value(m_BeaconPath), "<path>");
+ m_Options.add_option("", "", "orch", "Orchestrator URL for worker discovery", cxxopts::value(m_OrchestratorUrl), "<url>");
m_Options.add_option("",
"",
"mode",
"Select execution mode (http,inproc,dump,direct,beacon,buildlog)",
cxxopts::value(m_Mode)->default_value("http"),
"<string>");
+ m_Options
+ .add_option("", "", "dump-actions", "Dump each action to console as it is dispatched", cxxopts::value(m_DumpActions), "<bool>");
+ m_Options.add_option("", "o", "output", "Save action results to directory", cxxopts::value(m_OutputPath), "<path>");
+ m_Options.add_option("", "", "binary", "Write output as binary packages instead of YAML", cxxopts::value(m_Binary), "<bool>");
m_Options.add_option("", "", "quiet", "Quiet mode (less logging)", cxxopts::value(m_Quiet), "<bool>");
m_Options.parse_positional("mode");
}
@@ -236,16 +245,16 @@ ExecCommand::InProcessExecute()
ZEN_ASSERT(m_ChunkResolver);
ChunkResolver& Resolver = *m_ChunkResolver;
- zen::compute::FunctionServiceSession FunctionSession(Resolver);
+ zen::compute::ComputeServiceSession ComputeSession(Resolver);
std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp");
- FunctionSession.AddLocalRunner(Resolver, TempPath);
+ ComputeSession.AddLocalRunner(Resolver, TempPath);
- return ExecUsingSession(FunctionSession);
+ return ExecUsingSession(ComputeSession);
}
int
-ExecCommand::ExecUsingSession(zen::compute::FunctionServiceSession& FunctionSession)
+ExecCommand::ExecUsingSession(zen::compute::ComputeServiceSession& ComputeSession)
{
struct JobTracker
{
@@ -281,6 +290,117 @@ ExecCommand::ExecUsingSession(zen::compute::FunctionServiceSession& FunctionSess
JobTracker PendingJobs;
+ struct ActionSummaryEntry
+ {
+ int32_t Lsn = 0;
+ int RecordingIndex = 0;
+ IoHash ActionId;
+ std::string FunctionName;
+ int InputAttachments = 0;
+ uint64_t InputBytes = 0;
+ int OutputAttachments = 0;
+ uint64_t OutputBytes = 0;
+ float WallSeconds = 0.0f;
+ float CpuSeconds = 0.0f;
+ uint64_t SubmittedTicks = 0;
+ uint64_t StartedTicks = 0;
+ std::string ExecutionLocation;
+ };
+
+ std::mutex SummaryLock;
+ std::unordered_map<int32_t, ActionSummaryEntry> SummaryEntries;
+
+ ComputeSession.WaitUntilReady();
+
+ // Register as a client with the orchestrator (best-effort)
+
+ std::string OrchestratorClientId;
+
+ if (!m_OrchestratorUrl.empty())
+ {
+ try
+ {
+ HttpClient OrchestratorClient(m_OrchestratorUrl);
+
+ CbObjectWriter Ann;
+ Ann << "session_id"sv << GetSessionId();
+ Ann << "hostname"sv << std::string_view(GetMachineName());
+
+ CbObjectWriter Meta;
+ Meta << "source"sv
+ << "zen-exec"sv;
+ Ann << "metadata"sv << Meta.Save();
+
+ auto Resp = OrchestratorClient.Post("/orch/clients", Ann.Save());
+ if (Resp.IsSuccess())
+ {
+ OrchestratorClientId = std::string(Resp.AsObject()["id"].AsString());
+ ZEN_CONSOLE_INFO("registered with orchestrator as {}", OrchestratorClientId);
+ }
+ else
+ {
+ ZEN_WARN("failed to register with orchestrator (status {})", static_cast<int>(Resp.StatusCode));
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("failed to register with orchestrator: {}", Ex.what());
+ }
+ }
+
+ Stopwatch OrchestratorHeartbeatTimer;
+
+ auto SendOrchestratorHeartbeat = [&] {
+ if (OrchestratorClientId.empty() || OrchestratorHeartbeatTimer.GetElapsedTimeMs() < 30'000)
+ {
+ return;
+ }
+ OrchestratorHeartbeatTimer.Reset();
+ try
+ {
+ HttpClient OrchestratorClient(m_OrchestratorUrl);
+ std::ignore = OrchestratorClient.Post(fmt::format("/orch/clients/{}/update", OrchestratorClientId));
+ }
+ catch (...)
+ {
+ }
+ };
+
+ auto ClientCleanup = MakeGuard([&] {
+ if (!OrchestratorClientId.empty())
+ {
+ try
+ {
+ HttpClient OrchestratorClient(m_OrchestratorUrl);
+ std::ignore = OrchestratorClient.Post(fmt::format("/orch/clients/{}/complete", OrchestratorClientId));
+ }
+ catch (...)
+ {
+ }
+ }
+ });
+
+ // Create a queue to group all actions from this exec session
+
+ CbObjectWriter Metadata;
+ Metadata << "source"sv
+ << "zen-exec"sv;
+
+ auto QueueResult = ComputeSession.CreateQueue("zen-exec", Metadata.Save());
+ const int QueueId = QueueResult.QueueId;
+ if (!QueueId)
+ {
+ ZEN_ERROR("failed to create compute queue");
+ return 1;
+ }
+
+ auto QueueCleanup = MakeGuard([&] { ComputeSession.DeleteQueue(QueueId); });
+
+ if (!m_OutputPath.empty())
+ {
+ zen::CreateDirectories(m_OutputPath);
+ }
+
std::atomic<int> IsDraining{0};
auto DrainCompletedJobs = [&] {
@@ -292,7 +412,7 @@ ExecCommand::ExecUsingSession(zen::compute::FunctionServiceSession& FunctionSess
auto _ = MakeGuard([&] { IsDraining.store(0, std::memory_order_release); });
CbObjectWriter Cbo;
- FunctionSession.GetCompleted(Cbo);
+ ComputeSession.GetQueueCompleted(QueueId, Cbo);
if (CbObject Completed = Cbo.Save())
{
@@ -301,10 +421,89 @@ ExecCommand::ExecUsingSession(zen::compute::FunctionServiceSession& FunctionSess
int32_t CompleteLsn = It.AsInt32();
CbPackage ResultPackage;
- HttpResponseCode Response = FunctionSession.GetActionResult(CompleteLsn, /* out */ ResultPackage);
+ HttpResponseCode Response = ComputeSession.GetActionResult(CompleteLsn, /* out */ ResultPackage);
if (Response == HttpResponseCode::OK)
{
+ if (!m_OutputPath.empty() && ResultPackage)
+ {
+ int OutputAttachments = 0;
+ uint64_t OutputBytes = 0;
+
+ if (!m_Binary)
+ {
+ // Write the root object as YAML
+ ExtendableStringBuilder<4096> YamlStr;
+ CompactBinaryToYaml(ResultPackage.GetObject(), YamlStr);
+
+ std::string_view Yaml = YamlStr;
+ zen::WriteFile(m_OutputPath / fmt::format("{}.result.yaml", CompleteLsn),
+ IoBuffer(IoBuffer::Clone, Yaml.data(), Yaml.size()));
+
+ // Write decompressed attachments
+ auto Attachments = ResultPackage.GetAttachments();
+
+ if (!Attachments.empty())
+ {
+ std::filesystem::path AttDir = m_OutputPath / fmt::format("{}.result.attachments", CompleteLsn);
+ zen::CreateDirectories(AttDir);
+
+ for (const CbAttachment& Att : Attachments)
+ {
+ ++OutputAttachments;
+
+ IoHash AttHash = Att.GetHash();
+
+ if (Att.IsCompressedBinary())
+ {
+ SharedBuffer Decompressed = Att.AsCompressedBinary().Decompress();
+ OutputBytes += Decompressed.GetSize();
+ zen::WriteFile(AttDir / AttHash.ToHexString(),
+ IoBuffer(IoBuffer::Clone, Decompressed.GetData(), Decompressed.GetSize()));
+ }
+ else
+ {
+ SharedBuffer Binary = Att.AsBinary();
+ OutputBytes += Binary.GetSize();
+ zen::WriteFile(AttDir / AttHash.ToHexString(),
+ IoBuffer(IoBuffer::Clone, Binary.GetData(), Binary.GetSize()));
+ }
+ }
+ }
+
+ if (!m_QuietLogging)
+ {
+ ZEN_CONSOLE("saved result: {}/{}.result.yaml ({} attachments)",
+ m_OutputPath.string(),
+ CompleteLsn,
+ OutputAttachments);
+ }
+ }
+ else
+ {
+ CompositeBuffer Serialized = FormatPackageMessageBuffer(ResultPackage);
+ zen::WriteFile(m_OutputPath / fmt::format("{}.result.pkg", CompleteLsn), std::move(Serialized));
+
+ for (const CbAttachment& Att : ResultPackage.GetAttachments())
+ {
+ ++OutputAttachments;
+ OutputBytes += Att.AsBinary().GetSize();
+ }
+
+ if (!m_QuietLogging)
+ {
+ ZEN_CONSOLE("saved result: {}/{}.result.pkg", m_OutputPath.string(), CompleteLsn);
+ }
+ }
+
+ std::lock_guard Lock(SummaryLock);
+ if (auto It2 = SummaryEntries.find(CompleteLsn); It2 != SummaryEntries.end())
+ {
+ It2->second.OutputAttachments = OutputAttachments;
+ It2->second.OutputBytes = OutputBytes;
+ }
+ }
+
PendingJobs.Remove(CompleteLsn);
ZEN_CONSOLE("completed: LSN {} ({} still pending)", CompleteLsn, PendingJobs.GetSize());
@@ -321,7 +520,7 @@ ExecCommand::ExecUsingSession(zen::compute::FunctionServiceSession& FunctionSess
{
CbPackage WorkerDesc = Kv.second;
- FunctionSession.RegisterWorker(WorkerDesc);
+ ComputeSession.RegisterWorker(WorkerDesc);
}
// Then submit work items
@@ -367,10 +566,14 @@ ExecCommand::ExecUsingSession(zen::compute::FunctionServiceSession& FunctionSess
TargetParallelism = 1;
}
+ std::atomic<int> RecordingIndex{0};
+
m_RecordingReader->IterateActions(
[&](CbObject ActionObject, const IoHash& ActionId) {
// Enqueue job
+ const int CurrentRecordingIndex = RecordingIndex++;
+
Stopwatch SubmitTimer;
const int Priority = 0;
@@ -404,8 +607,29 @@ ExecCommand::ExecUsingSession(zen::compute::FunctionServiceSession& FunctionSess
ObjStr);
}
- if (zen::compute::FunctionServiceSession::EnqueueResult EnqueueResult =
- FunctionSession.EnqueueAction(ActionObject, Priority))
+ if (m_DumpActions)
+ {
+ int AttachmentCount = 0;
+ uint64_t AttachmentBytes = 0;
+
+ ActionObject.IterateAttachments([&](CbFieldView Field) {
+ IoHash AttachData = Field.AsAttachment();
+
+ ++AttachmentCount;
+
+ if (IoBuffer ChunkData = m_ChunkResolver->FindChunkByCid(AttachData))
+ {
+ AttachmentBytes += ChunkData.GetSize();
+ }
+ });
+
+ zen::ExtendableStringBuilder<1024> ObjStr;
+ zen::CompactBinaryToYaml(ActionObject, ObjStr);
+ ZEN_CONSOLE("action {} ({} attachments, {}):\n{}", ActionId, AttachmentCount, NiceBytes(AttachmentBytes), ObjStr);
+ }
+
+ if (zen::compute::ComputeServiceSession::EnqueueResult EnqueueResult =
+ ComputeSession.EnqueueActionToQueue(QueueId, ActionObject, Priority))
{
const int32_t LsnField = EnqueueResult.Lsn;
@@ -421,6 +645,96 @@ ExecCommand::ExecUsingSession(zen::compute::FunctionServiceSession& FunctionSess
RemainingWorkItems);
}
+ if (!m_OutputPath.empty())
+ {
+ ActionSummaryEntry Entry;
+ Entry.Lsn = LsnField;
+ Entry.RecordingIndex = CurrentRecordingIndex;
+ Entry.ActionId = ActionId;
+ Entry.FunctionName = std::string(ActionObject["Function"sv].AsString());
+
+ if (!m_Binary)
+ {
+ // Write action object as YAML
+ ExtendableStringBuilder<4096> YamlStr;
+ CompactBinaryToYaml(ActionObject, YamlStr);
+
+ std::string_view Yaml = YamlStr;
+ zen::WriteFile(m_OutputPath / fmt::format("{}.action.yaml", LsnField),
+ IoBuffer(IoBuffer::Clone, Yaml.data(), Yaml.size()));
+
+ // Write decompressed input attachments
+ std::filesystem::path AttDir = m_OutputPath / fmt::format("{}.action.attachments", LsnField);
+ bool AttDirCreated = false;
+
+ ActionObject.IterateAttachments([&](CbFieldView Field) {
+ IoHash AttachCid = Field.AsAttachment();
+ ++Entry.InputAttachments;
+
+ if (IoBuffer ChunkData = m_ChunkResolver->FindChunkByCid(AttachCid))
+ {
+ IoHash RawHash;
+ uint64_t RawSize = 0;
+ CompressedBuffer Compressed =
+ CompressedBuffer::FromCompressed(SharedBuffer(ChunkData), RawHash, RawSize);
+ SharedBuffer Decompressed = Compressed.Decompress();
+
+ Entry.InputBytes += Decompressed.GetSize();
+
+ if (!AttDirCreated)
+ {
+ zen::CreateDirectories(AttDir);
+ AttDirCreated = true;
+ }
+
+ zen::WriteFile(AttDir / AttachCid.ToHexString(),
+ IoBuffer(IoBuffer::Clone, Decompressed.GetData(), Decompressed.GetSize()));
+ }
+ });
+
+ if (!m_QuietLogging)
+ {
+ ZEN_CONSOLE("saved action: {}/{}.action.yaml ({} attachments)",
+ m_OutputPath.string(),
+ LsnField,
+ Entry.InputAttachments);
+ }
+ }
+ else
+ {
+ // Build a CbPackage from the action and write as .pkg
+ CbPackage ActionPackage;
+ ActionPackage.SetObject(ActionObject);
+
+ ActionObject.IterateAttachments([&](CbFieldView Field) {
+ IoHash AttachCid = Field.AsAttachment();
+ ++Entry.InputAttachments;
+
+ if (IoBuffer ChunkData = m_ChunkResolver->FindChunkByCid(AttachCid))
+ {
+ IoHash RawHash;
+ uint64_t RawSize = 0;
+ CompressedBuffer Compressed =
+ CompressedBuffer::FromCompressed(SharedBuffer(ChunkData), RawHash, RawSize);
+
+ Entry.InputBytes += ChunkData.GetSize();
+ ActionPackage.AddAttachment(CbAttachment(std::move(Compressed), RawHash));
+ }
+ });
+
+ CompositeBuffer Serialized = FormatPackageMessageBuffer(ActionPackage);
+ zen::WriteFile(m_OutputPath / fmt::format("{}.action.pkg", LsnField), std::move(Serialized));
+
+ if (!m_QuietLogging)
+ {
+ ZEN_CONSOLE("saved action: {}/{}.action.pkg", m_OutputPath.string(), LsnField);
+ }
+ }
+
+ std::lock_guard Lock(SummaryLock);
+ SummaryEntries.emplace(LsnField, std::move(Entry));
+ }
+
PendingJobs.Insert(LsnField);
}
else
@@ -450,6 +764,7 @@ ExecCommand::ExecUsingSession(zen::compute::FunctionServiceSession& FunctionSess
// Check for completed work
DrainCompletedJobs();
+ SendOrchestratorHeartbeat();
},
TargetParallelism);
@@ -461,6 +776,394 @@ ExecCommand::ExecUsingSession(zen::compute::FunctionServiceSession& FunctionSess
zen::Sleep(500);
DrainCompletedJobs();
+ SendOrchestratorHeartbeat();
+ }
+
+ // Merge timing data from queue history into summary entries
+
+ if (!SummaryEntries.empty())
+ {
+ // RunnerAction::State indices (can't include functionrunner.h from here)
+ constexpr int kStateNew = 0;
+ constexpr int kStatePending = 1;
+ constexpr int kStateRunning = 3;
+ constexpr int kStateCompleted = 4; // first terminal state
+ constexpr int kStateCount = 8;
+
+ for (const auto& HistEntry : ComputeSession.GetQueueHistory(QueueId, 0))
+ {
+ std::lock_guard Lock(SummaryLock);
+ if (auto It = SummaryEntries.find(HistEntry.Lsn); It != SummaryEntries.end())
+ {
+ // Find terminal state timestamp (Completed, Failed, Abandoned, or Cancelled)
+ uint64_t EndTick = 0;
+ for (int S = kStateCompleted; S < kStateCount; ++S)
+ {
+ if (HistEntry.Timestamps[S] != 0)
+ {
+ EndTick = HistEntry.Timestamps[S];
+ break;
+ }
+ }
+ uint64_t StartTick = HistEntry.Timestamps[kStateNew];
+ if (EndTick > StartTick)
+ {
+ It->second.WallSeconds = float(double(EndTick - StartTick) / double(TimeSpan::TicksPerSecond));
+ }
+ It->second.CpuSeconds = HistEntry.CpuSeconds;
+ It->second.SubmittedTicks = HistEntry.Timestamps[kStatePending];
+ It->second.StartedTicks = HistEntry.Timestamps[kStateRunning];
+ It->second.ExecutionLocation = HistEntry.ExecutionLocation;
+ }
+ }
+ }
+
+ // Write summary file if output path is set
+
+ if (!m_OutputPath.empty() && !SummaryEntries.empty())
+ {
+ std::vector<ActionSummaryEntry> Sorted;
+ Sorted.reserve(SummaryEntries.size());
+ for (auto& [_, Entry] : SummaryEntries)
+ {
+ Sorted.push_back(std::move(Entry));
+ }
+
+ std::sort(Sorted.begin(), Sorted.end(), [](const ActionSummaryEntry& A, const ActionSummaryEntry& B) {
+ return A.RecordingIndex < B.RecordingIndex;
+ });
+
+ auto FormatTimestamp = [](uint64_t Ticks) -> std::string {
+ if (Ticks == 0)
+ {
+ return "-";
+ }
+ return DateTime(Ticks).ToString("%H:%M:%S.%s");
+ };
+
+ ExtendableStringBuilder<4096> Summary;
+ Summary.Append(fmt::format("{:<8} {:<8} {:<40} {:<40} {:>8} {:>12} {:>8} {:>12} {:>8} {:>8} {:>12} {:>12} {:<24}\n",
+ "LSN",
+ "Index",
+ "ActionId",
+ "Function",
+ "InAtt",
+ "InBytes",
+ "OutAtt",
+ "OutBytes",
+ "Wall(s)",
+ "CPU(s)",
+ "Submitted",
+ "Started",
+ "Location"));
+ Summary.Append(fmt::format("{:-<8} {:-<8} {:-<40} {:-<40} {:-<8} {:-<12} {:-<8} {:-<12} {:-<8} {:-<8} {:-<12} {:-<12} {:-<24}\n",
+ "",
+ "",
+ "",
+ "",
+ "",
+ "",
+ "",
+ "",
+ "",
+ "",
+ "",
+ "",
+ ""));
+
+ for (const ActionSummaryEntry& Entry : Sorted)
+ {
+ Summary.Append(fmt::format("{:<8} {:<8} {:<40} {:<40} {:>8} {:>12} {:>8} {:>12} {:>8.2f} {:>8.2f} {:>12} {:>12} {:<24}\n",
+ Entry.Lsn,
+ Entry.RecordingIndex,
+ Entry.ActionId,
+ Entry.FunctionName,
+ Entry.InputAttachments,
+ NiceBytes(Entry.InputBytes),
+ Entry.OutputAttachments,
+ NiceBytes(Entry.OutputBytes),
+ Entry.WallSeconds,
+ Entry.CpuSeconds,
+ FormatTimestamp(Entry.SubmittedTicks),
+ FormatTimestamp(Entry.StartedTicks),
+ Entry.ExecutionLocation));
+ }
+
+ std::filesystem::path SummaryPath = m_OutputPath / "summary.txt";
+ std::string_view SummaryStr = Summary;
+ zen::WriteFile(SummaryPath, IoBuffer(IoBuffer::Clone, SummaryStr.data(), SummaryStr.size()));
+
+ ZEN_CONSOLE("wrote summary to {}", SummaryPath.string());
+
+ if (!m_Binary)
+ {
+ auto EscapeHtml = [](std::string_view Input) -> std::string {
+ std::string Out;
+ Out.reserve(Input.size());
+ for (char C : Input)
+ {
+ switch (C)
+ {
+ case '&':
+ Out += "&amp;";
+ break;
+ case '<':
+ Out += "&lt;";
+ break;
+ case '>':
+ Out += "&gt;";
+ break;
+ case '"':
+ Out += "&quot;";
+ break;
+ case '\'':
+ Out += "&#39;";
+ break;
+ default:
+ Out += C;
+ }
+ }
+ return Out;
+ };
+
+ auto EscapeJson = [](std::string_view Input) -> std::string {
+ std::string Out;
+ Out.reserve(Input.size());
+ for (char C : Input)
+ {
+ switch (C)
+ {
+ case '"':
+ Out += "\\\"";
+ break;
+ case '\\':
+ Out += "\\\\";
+ break;
+ case '\n':
+ Out += "\\n";
+ break;
+ case '\r':
+ Out += "\\r";
+ break;
+ case '\t':
+ Out += "\\t";
+ break;
+ default:
+ if (static_cast<unsigned char>(C) < 0x20)
+ {
+ Out += fmt::format("\\u{:04x}", static_cast<unsigned>(static_cast<unsigned char>(C)));
+ }
+ else
+ {
+ Out += C;
+ }
+ }
+ }
+ return Out;
+ };
+
+ ExtendableStringBuilder<8192> Html;
+
+ Html.Append(std::string_view(R"(<!DOCTYPE html>
+<html><head><meta charset="utf-8"><title>Exec Summary</title>
+<style>
+body{font-family:system-ui,sans-serif;margin:20px;background:#fafafa}
+#container{overflow-y:auto;height:calc(100vh - 120px)}
+table{border-collapse:collapse;width:100%}
+th,td{border:1px solid #ddd;padding:6px 10px;text-align:left;white-space:nowrap}
+th{background:#f0f0f0;cursor:pointer;user-select:none;position:sticky;top:0;z-index:1}
+th:hover{background:#e0e0e0}
+th .arrow{font-size:0.7em;margin-left:4px}
+tr:hover{background:#e8f0fe}
+input{padding:6px 10px;margin-bottom:12px;width:300px;border:1px solid #ccc;border-radius:4px}
+button{padding:6px 14px;margin-left:8px;margin-bottom:12px;border:1px solid #ccc;border-radius:4px;background:#f0f0f0;cursor:pointer}
+button:hover{background:#e0e0e0}
+a{color:#1a73e8;text-decoration:none}
+a:hover{text-decoration:underline}
+.num{text-align:right}
+</style></head><body>
+<h2>Exec Summary</h2>
+<input type="text" id="filter" placeholder="Filter by function name..."><button id="csvBtn">Export CSV</button>
+<div id="container">
+<table><thead><tr>
+<th data-col="0">LSN <span class="arrow"></span></th>
+<th data-col="1">Index <span class="arrow"></span></th>
+<th data-col="2">Action ID <span class="arrow"></span></th>
+<th data-col="3">Function <span class="arrow"></span></th>
+<th data-col="4">In Attachments <span class="arrow"></span></th>
+<th data-col="5">In Bytes <span class="arrow"></span></th>
+<th data-col="6">Out Attachments <span class="arrow"></span></th>
+<th data-col="7">Out Bytes <span class="arrow"></span></th>
+<th data-col="8">Wall(s) <span class="arrow"></span></th>
+<th data-col="9">CPU(s) <span class="arrow"></span></th>
+<th data-col="10">Submitted <span class="arrow"></span></th>
+<th data-col="11">Started <span class="arrow"></span></th>
+<th data-col="12">Location <span class="arrow"></span></th>
+</tr></thead><tbody>
+<tr id="spacerTop"><td colspan="13"></td></tr>
+<tr id="spacerBot"><td colspan="13"></td></tr>
+</tbody></table></div>
+<script>
+const DATA=[
+)"));
+
+ std::string_view ResultExt = ".result.yaml";
+ std::string_view ActionExt = ".action.yaml";
+
+ for (const ActionSummaryEntry& Entry : Sorted)
+ {
+ std::string SafeName = EscapeJson(EscapeHtml(Entry.FunctionName));
+ std::string ActionIdStr = Entry.ActionId.ToHexString();
+ std::string ActionLink;
+ if (!ActionExt.empty())
+ {
+ ActionLink = EscapeJson(fmt::format(" <a href=\"{}{}\">[action]</a>", Entry.Lsn, ActionExt));
+ }
+
+ // Indices: 0=lsn, 1=idx, 2=actionId, 3=fn, 4=inAtt, 5=inBytes, 6=outAtt, 7=outBytes,
+ // 8=wall, 9=cpu, 10=niceBytesIn, 11=niceBytesOut, 12=actionLink,
+ // 13=submittedTicks, 14=startedTicks, 15=submittedDisplay, 16=startedDisplay,
+ // 17=location
+ Html.Append(
+ fmt::format("[{},{},\"{}\",\"{}\",{},{},{},{},{:.6f},{:.6f},\"{}\",\"{}\",\"{}\",{},{},\"{}\",\"{}\",\"{}\"],\n",
+ Entry.Lsn,
+ Entry.RecordingIndex,
+ ActionIdStr,
+ SafeName,
+ Entry.InputAttachments,
+ Entry.InputBytes,
+ Entry.OutputAttachments,
+ Entry.OutputBytes,
+ Entry.WallSeconds,
+ Entry.CpuSeconds,
+ EscapeJson(NiceBytes(Entry.InputBytes)),
+ EscapeJson(NiceBytes(Entry.OutputBytes)),
+ ActionLink,
+ Entry.SubmittedTicks,
+ Entry.StartedTicks,
+ FormatTimestamp(Entry.SubmittedTicks),
+ FormatTimestamp(Entry.StartedTicks),
+ EscapeJson(EscapeHtml(Entry.ExecutionLocation))));
+ }
+
+ Html.Append(fmt::format(R"(];
+const RESULT_EXT="{}";
+)",
+ ResultExt));
+
+ Html.Append(std::string_view(R"JS((function(){
+const ROW_H=33,BUF=20;
+const container=document.getElementById("container");
+const tbody=container.querySelector("tbody");
+const headers=container.querySelectorAll("th");
+const filterInput=document.getElementById("filter");
+const spacerTop=document.getElementById("spacerTop");
+const spacerBot=document.getElementById("spacerBot");
+let view=[...DATA.keys()];
+let sortCol=-1,sortAsc=true;
+const COLS=[
+ {f:0,t:"n"},{f:1,t:"n"},{f:2,t:"s"},{f:3,t:"s"},
+ {f:4,t:"n"},{f:5,t:"n"},{f:6,t:"n"},{f:7,t:"n"},
+ {f:8,t:"n"},{f:9,t:"n"},{f:13,t:"n"},{f:14,t:"n"},{f:17,t:"s"}
+];
+function rowHtml(i){
+ const d=DATA[view[i]];
+ const bg=i%2?' style="background:#f9f9f9"':'';
+ return '<tr'+bg+'>'+
+ '<td class="num"><a href="'+d[0]+RESULT_EXT+'">'+d[0]+'</a></td>'+
+ '<td class="num">'+d[1]+'</td>'+
+ '<td><code>'+d[2]+'</code></td>'+
+ '<td>'+d[3]+d[12]+'</td>'+
+ '<td class="num">'+d[4]+'</td>'+
+ '<td class="num">'+d[10]+'</td>'+
+ '<td class="num">'+d[6]+'</td>'+
+ '<td class="num">'+d[11]+'</td>'+
+ '<td class="num">'+d[8].toFixed(2)+'</td>'+
+ '<td class="num">'+d[9].toFixed(2)+'</td>'+
+ '<td class="num">'+d[15]+'</td>'+
+ '<td class="num">'+d[16]+'</td>'+
+ '<td>'+d[17]+'</td></tr>';
+}
+let lastFirst=-1,lastLast=-1;
+function render(){
+ const scrollTop=container.scrollTop;
+ const viewH=container.clientHeight;
+ let first=Math.floor(scrollTop/ROW_H)-BUF;
+ let last=Math.ceil((scrollTop+viewH)/ROW_H)+BUF;
+ if(first<0) first=0;
+ if(last>=view.length) last=view.length-1;
+ if(first===lastFirst&&last===lastLast) return;
+ lastFirst=first;lastLast=last;
+ const rows=[];
+ for(let i=first;i<=last;i++) rows.push(rowHtml(i));
+ spacerTop.style.height=(first*ROW_H)+'px';
+ spacerBot.style.height=((view.length-1-last)*ROW_H)+'px';
+ const mid=rows.join('');
+ const topTr='<tr id="spacerTop"><td colspan="13" style="border:0;padding:0;height:'+spacerTop.style.height+'"></td></tr>';
+ const botTr='<tr id="spacerBot"><td colspan="13" style="border:0;padding:0;height:'+spacerBot.style.height+'"></td></tr>';
+ tbody.innerHTML=topTr+mid+botTr;
+}
+function applySort(){
+ if(sortCol<0) return;
+ const c=COLS[sortCol];
+ view.sort((a,b)=>{
+ const va=DATA[a][c.f],vb=DATA[b][c.f];
+ if(c.t==="n") return sortAsc?va-vb:vb-va;
+ return sortAsc?(va<vb?-1:va>vb?1:0):(va>vb?-1:va<vb?1:0);
+ });
+}
+function rebuild(){
+ const q=filterInput.value.toLowerCase();
+ view=[];
+ for(let i=0;i<DATA.length;i++){
+ if(!q||DATA[i][3].toLowerCase().includes(q)) view.push(i);
+ }
+ applySort();
+ lastFirst=lastLast=-1;
+ render();
+}
+headers.forEach(th=>{
+ th.addEventListener("click",()=>{
+ const col=parseInt(th.dataset.col);
+ if(sortCol===col){sortAsc=!sortAsc}else{sortCol=col;sortAsc=true}
+ headers.forEach(h=>h.querySelector(".arrow").textContent="");
+ th.querySelector(".arrow").textContent=sortAsc?"\u25B2":"\u25BC";
+ applySort();
+ lastFirst=lastLast=-1;
+ render();
+ });
+});
+filterInput.addEventListener("input",()=>rebuild());
+let ticking=false;
+container.addEventListener("scroll",()=>{
+ if(!ticking){ticking=true;requestAnimationFrame(()=>{render();ticking=false})}
+});
+rebuild();
+document.getElementById("csvBtn").addEventListener("click",()=>{
+ const H=["LSN","Index","Action ID","Function","In Attachments","In Bytes","Out Attachments","Out Bytes","Wall(s)","CPU(s)","Submitted","Started","Location"];
+ const esc=v=>{const s=String(v);return s.includes(',')||s.includes('"')||s.includes('\n')?'"'+s.replace(/"/g,'""')+'"':s};
+ const rows=[H.join(",")];
+ for(let i=0;i<view.length;i++){
+ const d=DATA[view[i]];
+ rows.push([d[0],d[1],d[2],d[3],d[4],d[5],d[6],d[7],d[8],d[9],d[15],d[16],d[17]].map(esc).join(","));
+ }
+ const blob=new Blob([rows.join("\n")],{type:"text/csv"});
+ const a=document.createElement("a");
+ a.href=URL.createObjectURL(blob);
+ a.download="summary.csv";
+ a.click();
+ URL.revokeObjectURL(a.href);
+});
+})();
+</script></body></html>
+)JS"));
+
+ std::filesystem::path HtmlPath = m_OutputPath / "summary.html";
+ std::string_view HtmlStr = Html;
+ zen::WriteFile(HtmlPath, IoBuffer(IoBuffer::Clone, HtmlStr.data(), HtmlStr.size()));
+
+ ZEN_CONSOLE("wrote HTML summary to {}", HtmlPath.string());
+ }
}
if (FailedWorkCounter)
@@ -491,10 +1194,10 @@ ExecCommand::HttpExecute()
std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp");
- zen::compute::FunctionServiceSession FunctionSession(Resolver);
- FunctionSession.AddRemoteRunner(Resolver, TempPath, m_HostName);
+ zen::compute::ComputeServiceSession ComputeSession(Resolver);
+ ComputeSession.AddRemoteRunner(Resolver, TempPath, m_HostName);
- return ExecUsingSession(FunctionSession);
+ return ExecUsingSession(ComputeSession);
}
int
@@ -504,11 +1207,21 @@ ExecCommand::BeaconExecute()
ChunkResolver& Resolver = *m_ChunkResolver;
std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp");
- zen::compute::FunctionServiceSession FunctionSession(Resolver);
- FunctionSession.AddRemoteRunner(Resolver, TempPath, "http://localhost:8558");
- // FunctionSession.AddRemoteRunner(Resolver, TempPath, "http://10.99.9.246:8558");
+ zen::compute::ComputeServiceSession ComputeSession(Resolver);
+
+ if (!m_OrchestratorUrl.empty())
+ {
+ ZEN_CONSOLE_INFO("using orchestrator at {}", m_OrchestratorUrl);
+ ComputeSession.SetOrchestratorEndpoint(m_OrchestratorUrl);
+ ComputeSession.SetOrchestratorBasePath(TempPath);
+ }
+ else
+ {
+ ZEN_CONSOLE_INFO("note: using hard-coded local worker path");
+ ComputeSession.AddRemoteRunner(Resolver, TempPath, "http://localhost:8558");
+ }
- return ExecUsingSession(FunctionSession);
+ return ExecUsingSession(ComputeSession);
}
//////////////////////////////////////////////////////////////////////////
@@ -635,10 +1348,10 @@ ExecCommand::BuildActionsLog()
std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp");
- zen::compute::FunctionServiceSession FunctionSession(Resolver);
- FunctionSession.StartRecording(Resolver, m_RecordingLogPath);
+ zen::compute::ComputeServiceSession ComputeSession(Resolver);
+ ComputeSession.StartRecording(Resolver, m_RecordingLogPath);
- return ExecUsingSession(FunctionSession);
+ return ExecUsingSession(ComputeSession);
}
void