aboutsummaryrefslogtreecommitdiff
path: root/src/zen/cmds/exec_cmd.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-03-18 11:19:10 +0100
committerGitHub Enterprise <[email protected]>2026-03-18 11:19:10 +0100
commiteba410c4168e23d7908827eb34b7cf0c58a5dc48 (patch)
tree3cda8e8f3f81941d3bb5b84a8155350c5bb2068c /src/zen/cmds/exec_cmd.cpp
parentbugfix release - v5.7.23 (#851) (diff)
downloadarchived-zen-eba410c4168e23d7908827eb34b7cf0c58a5dc48.tar.xz
archived-zen-eba410c4168e23d7908827eb34b7cf0c58a5dc48.zip
Compute batching (#849)
### Compute Batch Submission - Consolidate duplicated action submission logic in `httpcomputeservice` into a single `HandleSubmitAction` supporting both single-action and batch (actions array) payloads - Group actions by queue in `RemoteHttpRunner` and submit as batches with configurable chunk size, falling back to individual submission on failure - Extract shared helpers: `MakeErrorResult`, `ValidateQueueForEnqueue`, `ActivateActionInQueue`, `RemoveActionFromActiveMaps` ### Retracted Action State - Add `Retracted` state to `RunnerAction` for retry-free rescheduling — an explicit request to pull an action back and reschedule it on a different runner without incrementing `RetryCount` - Implement idempotent `RetractAction()` on `RunnerAction` and `ComputeServiceSession` - Add `POST jobs/{lsn}/retract` and `queues/{queueref}/jobs/{lsn}/retract` HTTP endpoints - Add state machine documentation and per-state comments to `RunnerAction` ### Compute Race Fixes - Fix race in `HandleActionUpdates` where actions enqueued between session abandon and scheduler tick were never abandoned, causing `GetActionResult` to return 202 indefinitely - Fix queue `ActiveCount` race where `NotifyQueueActionComplete` was called after releasing `m_ResultsLock`, allowing callers to observe stale counters immediately after `GetActionResult` returned OK ### Logging Optimization and ANSI improvements - Improve `AnsiColorStdoutSink` write efficiency — single write call, dirty-flag flush, `RwLock` instead of `std::mutex` - Move ANSI color emission from sink into formatters via `Formatter::SetColorEnabled()`; remove `ColorRangeStart`/`End` from `LogMessage` - Extract color helpers (`AnsiColorForLevel`, `StripAnsiSgrSequences`) into `helpers.h` - Strip upstream ANSI SGR escapes in non-color output mode. This enables colour in log messages without polluting log files with ANSI control sequences - Move `RotatingFileSink`, `JsonFormatter`, and `FullFormatter` from header-only to pimpl with `.cpp` files ### CLI / Exec Refactoring - Extract `ExecSessionRunner` class from ~920-line `ExecUsingSession` into focused methods and a `ExecSessionConfig` struct - Replace monolithic `ExecCommand` with subcommand-based architecture (`http`, `inproc`, `beacon`, `dump`, `buildlog`) - Allow parent options to appear after subcommand name by parsing subcommand args permissively and forwarding unmatched tokens to the parent parser ### Testing Improvements - Fix `--test-suite` filter being ignored due to accumulation with default wildcard filter - Add test suite banners to test listener output - Made `function.session.abandon_pending` test more robust ### Startup / Reliability Fixes - Fix silent exit when a second zenserver instance detects a port conflict — use `ZEN_CONSOLE_*` for log calls that precede `InitializeLogging()` - Fix two potential SIGSEGV paths during early startup: guard `sentry_options_new()` returning nullptr, and throw on `ZenServerState::Register()` returning nullptr instead of dereferencing - Fail on unrecognized zenserver `--mode` instead of silently defaulting to store ### Other - Show host details (hostname, platform, CPU count, memory) when discovering new compute workers - Move frontend `html.zip` from source tree into build directory - Add format specifications for Compact Binary and Compressed Buffer wire formats - Add `WriteCompactBinaryObject` to zencore - Extended `ConsoleTui` with additional functionality - Add `--vscode` option to `xmake sln` for clangd / `compile_commands.json` support - Disable compute/horde/nomad in release builds (not yet production-ready) - Disable unintended `ASIO_HAS_IO_URING` enablement - Fix crashpad patch missing leading whitespace - Clean up code triggering gcc false positives
Diffstat (limited to 'src/zen/cmds/exec_cmd.cpp')
-rw-r--r--src/zen/cmds/exec_cmd.cpp1858
1 files changed, 979 insertions, 879 deletions
diff --git a/src/zen/cmds/exec_cmd.cpp b/src/zen/cmds/exec_cmd.cpp
index cbc153e07..30e860a3f 100644
--- a/src/zen/cmds/exec_cmd.cpp
+++ b/src/zen/cmds/exec_cmd.cpp
@@ -18,6 +18,7 @@
#include <zencore/stream.h>
#include <zencore/string.h>
#include <zencore/system.h>
+#include <zencore/thread.h>
#include <zencore/timer.h>
#include <zenhttp/httpclient.h>
#include <zenhttp/packageformat.h>
@@ -41,255 +42,122 @@ struct hash<zen::IoHash> : public zen::IoHash::Hasher
namespace zen {
-ExecCommand::ExecCommand()
-{
- m_Options.add_options()("h,help", "Print help");
- m_Options.add_option("", "u", "hosturl", kHostUrlHelp, cxxopts::value(m_HostName), "<hosturl>");
- m_Options.add_option("", "", "log", "Action log directory", cxxopts::value(m_RecordingLogPath), "<path>");
- m_Options.add_option("", "p", "path", "Recording path (directory or .actionlog file)", cxxopts::value(m_RecordingPath), "<path>");
- m_Options.add_option("", "", "offset", "Recording replay start offset", cxxopts::value(m_Offset), "<offset>");
- m_Options.add_option("", "", "stride", "Recording replay stride", cxxopts::value(m_Stride), "<stride>");
- m_Options.add_option("", "", "limit", "Recording replay limit", cxxopts::value(m_Limit), "<limit>");
- m_Options.add_option("", "", "beacon", "Beacon path", cxxopts::value(m_BeaconPath), "<path>");
- m_Options.add_option("", "", "orch", "Orchestrator URL for worker discovery", cxxopts::value(m_OrchestratorUrl), "<url>");
- m_Options.add_option("",
- "",
- "mode",
- "Select execution mode (http,inproc,dump,direct,beacon,buildlog)",
- cxxopts::value(m_Mode)->default_value("http"),
- "<string>");
- m_Options
- .add_option("", "", "dump-actions", "Dump each action to console as it is dispatched", cxxopts::value(m_DumpActions), "<bool>");
- m_Options.add_option("", "o", "output", "Save action results to directory", cxxopts::value(m_OutputPath), "<path>");
- m_Options.add_option("", "", "binary", "Write output as binary packages instead of YAML", cxxopts::value(m_Binary), "<bool>");
- m_Options.add_option("", "", "quiet", "Quiet mode (less logging)", cxxopts::value(m_Quiet), "<bool>");
- m_Options.parse_positional("mode");
-}
-
-ExecCommand::~ExecCommand()
-{
-}
-
-void
-ExecCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
-{
- // Configure
-
- if (!ParseOptions(argc, argv))
- {
- return;
- }
-
- m_HostName = ResolveTargetHostSpec(m_HostName);
-
- if (m_RecordingPath.empty())
- {
- throw OptionParseException("replay path is required!", m_Options.help());
- }
-
- m_VerboseLogging = GlobalOptions.IsVerbose;
- m_QuietLogging = m_Quiet && !m_VerboseLogging;
-
- enum ExecMode
- {
- kHttp,
- kDirect,
- kInproc,
- kDump,
- kBeacon,
- kBuildLog
- } Mode;
-
- if (m_Mode == "http"sv)
- {
- Mode = kHttp;
- }
- else if (m_Mode == "direct"sv)
- {
- Mode = kDirect;
- }
- else if (m_Mode == "inproc"sv)
- {
- Mode = kInproc;
- }
- else if (m_Mode == "dump"sv)
- {
- Mode = kDump;
- }
- else if (m_Mode == "beacon"sv)
- {
- Mode = kBeacon;
- }
- else if (m_Mode == "buildlog"sv)
- {
- Mode = kBuildLog;
- }
- else
- {
- throw OptionParseException("invalid mode specified!", m_Options.help());
- }
+namespace {
- // Gather information from recording path
-
- std::unique_ptr<zen::compute::RecordingReader> Reader;
- std::unique_ptr<zen::compute::UeRecordingReader> UeReader;
-
- std::filesystem::path RecordingPath{m_RecordingPath};
-
- if (!std::filesystem::is_directory(RecordingPath))
- {
- throw OptionParseException("replay path should be a directory path!", m_Options.help());
- }
- else
+ static std::string EscapeHtml(std::string_view Input)
{
- if (std::filesystem::is_directory(RecordingPath / "cid"))
+ std::string Out;
+ Out.reserve(Input.size());
+ for (char C : Input)
{
- Reader = std::make_unique<zen::compute::RecordingReader>(RecordingPath);
- m_WorkerMap = Reader->ReadWorkers();
- m_ChunkResolver = Reader.get();
- m_RecordingReader = Reader.get();
- }
- else
- {
- UeReader = std::make_unique<zen::compute::UeRecordingReader>(RecordingPath);
- m_WorkerMap = UeReader->ReadWorkers();
- m_ChunkResolver = UeReader.get();
- m_RecordingReader = UeReader.get();
+ switch (C)
+ {
+ case '&':
+ Out += "&amp;";
+ break;
+ case '<':
+ Out += "&lt;";
+ break;
+ case '>':
+ Out += "&gt;";
+ break;
+ case '"':
+ Out += "&quot;";
+ break;
+ case '\'':
+ Out += "&#39;";
+ break;
+ default:
+ Out += C;
+ }
}
+ return Out;
}
- ZEN_CONSOLE("found {} workers, {} action items", m_WorkerMap.size(), m_RecordingReader->GetActionCount());
-
- for (auto& Kv : m_WorkerMap)
+ static std::string EscapeJson(std::string_view Input)
{
- CbObject WorkerDesc = Kv.second.GetObject();
- const IoHash& WorkerId = Kv.first;
-
- RegisterWorkerFunctionsFromDescription(WorkerDesc, WorkerId);
-
- if (m_VerboseLogging)
+ std::string Out;
+ Out.reserve(Input.size());
+ for (char C : Input)
{
- zen::ExtendableStringBuilder<1024> ObjStr;
-# if 0
- zen::CompactBinaryToJson(WorkerDesc, ObjStr);
- ZEN_CONSOLE("worker {}: {}", WorkerId, ObjStr);
-# else
- zen::CompactBinaryToYaml(WorkerDesc, ObjStr);
- ZEN_CONSOLE("worker {}:\n{}", WorkerId, ObjStr);
-# endif
+ switch (C)
+ {
+ case '"':
+ Out += "\\\"";
+ break;
+ case '\\':
+ Out += "\\\\";
+ break;
+ case '\n':
+ Out += "\\n";
+ break;
+ case '\r':
+ Out += "\\r";
+ break;
+ case '\t':
+ Out += "\\t";
+ break;
+ default:
+ if (static_cast<unsigned char>(C) < 0x20)
+ {
+ Out += fmt::format("\\u{:04x}", static_cast<unsigned>(static_cast<unsigned char>(C)));
+ }
+ else
+ {
+ Out += C;
+ }
+ }
}
+ return Out;
}
- if (m_VerboseLogging)
- {
- EmitFunctionList(m_FunctionList);
- }
-
- // Iterate over work items and dispatch or log them
-
- int ReturnValue = 0;
-
- Stopwatch ExecTimer;
-
- switch (Mode)
- {
- case kHttp:
- // Forward requests to HTTP function service
- ReturnValue = HttpExecute();
- break;
-
- case kDirect:
- // Not currently supported
- ReturnValue = LocalMessagingExecute();
- break;
-
- case kInproc:
- // Handle execution in-core (by spawning child processes)
- ReturnValue = InProcessExecute();
- break;
-
- case kDump:
- // Dump high level information about actions to console
- ReturnValue = DumpWorkItems();
- break;
-
- case kBeacon:
- ReturnValue = BeaconExecute();
- break;
-
- case kBuildLog:
- ReturnValue = BuildActionsLog();
- break;
-
- default:
- ZEN_ERROR("Unknown operating mode! No work submitted");
-
- ReturnValue = 1;
- }
+} // namespace
- ZEN_CONSOLE("complete - took {}", NiceTimeSpanMs(ExecTimer.GetElapsedTimeMs()));
-
- if (!ReturnValue)
- {
- ZEN_CONSOLE("all work items completed successfully");
- }
- else
- {
- ZEN_CONSOLE("some work items failed (code {})", ReturnValue);
- }
-}
+//////////////////////////////////////////////////////////////////////////
+// ExecSessionConfig — read-only configuration for a session run
-int
-ExecCommand::InProcessExecute()
+struct ExecSessionConfig
{
- ZEN_ASSERT(m_ChunkResolver);
- ChunkResolver& Resolver = *m_ChunkResolver;
+ zen::ChunkResolver& Resolver;
+ zen::compute::RecordingReaderBase& RecordingReader;
+ const std::unordered_map<zen::IoHash, zen::CbPackage>& WorkerMap;
+ std::vector<ExecFunctionDefinition>& FunctionList; // mutable for EmitFunctionListOnce
+ std::string_view OrchestratorUrl;
+ const std::filesystem::path& OutputPath;
+ int Offset = 0;
+ int Stride = 1;
+ int Limit = 0;
+ bool Verbose = false;
+ bool Quiet = false;
+ bool DumpActions = false;
+ bool Binary = false;
+};
- zen::compute::ComputeServiceSession ComputeSession(Resolver);
+//////////////////////////////////////////////////////////////////////////
+// ExecSessionRunner — owns per-run state, drives the session lifecycle
- std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp");
- ComputeSession.AddLocalRunner(Resolver, TempPath);
+class ExecSessionRunner
+{
+public:
+ ExecSessionRunner(zen::compute::ComputeServiceSession& Session, const ExecSessionConfig& Config);
+ int Run();
- return ExecUsingSession(ComputeSession);
-}
+private:
+ // Types
-int
-ExecCommand::ExecUsingSession(zen::compute::ComputeServiceSession& ComputeSession)
-{
struct JobTracker
{
public:
- inline void Insert(int LsnField)
- {
- RwLock::ExclusiveLockScope _(Lock);
- PendingJobs.insert(LsnField);
- }
-
- inline bool IsEmpty() const
- {
- RwLock::SharedLockScope _(Lock);
- return PendingJobs.empty();
- }
-
- inline void Remove(int CompleteLsn)
- {
- RwLock::ExclusiveLockScope _(Lock);
- PendingJobs.erase(CompleteLsn);
- }
-
- inline size_t GetSize() const
- {
- RwLock::SharedLockScope _(Lock);
- return PendingJobs.size();
- }
+ void Insert(int LsnField);
+ bool IsEmpty() const;
+ void Remove(int CompleteLsn);
+ size_t GetSize() const;
private:
mutable RwLock Lock;
std::unordered_set<int> PendingJobs;
};
- JobTracker PendingJobs;
-
struct ActionSummaryEntry
{
int32_t Lsn = 0;
@@ -307,664 +175,471 @@ ExecCommand::ExecUsingSession(zen::compute::ComputeServiceSession& ComputeSessio
std::string ExecutionLocation;
};
- std::mutex SummaryLock;
- std::unordered_map<int32_t, ActionSummaryEntry> SummaryEntries;
+ // Methods
+
+ std::string RegisterOrchestratorClient();
+ void SendOrchestratorHeartbeat();
+ void CompleteOrchestratorClient();
+ void DrainCompletedJobs();
+ void SaveResultOutput(int32_t CompleteLsn, CbPackage& ResultPackage);
+ void SaveActionOutput(int32_t Lsn, int RecordingIndex, const IoHash& ActionId, const CbObject& ActionObject);
+ void WriteSummaryFiles();
+ void EmitFunctionListOnce();
+
+ // State
+
+ zen::compute::ComputeServiceSession& m_Session;
+ ExecSessionConfig m_Config;
+ JobTracker m_PendingJobs;
+ std::mutex m_SummaryLock;
+ std::unordered_map<int32_t, ActionSummaryEntry> m_SummaryEntries;
+ int m_QueueId = 0;
+ std::string m_OrchestratorClientId;
+ Stopwatch m_OrchestratorHeartbeatTimer;
+ bool m_FunctionListEmittedOnce = false;
+ std::atomic<int> m_IsDraining{0};
+};
- ComputeSession.WaitUntilReady();
+//////////////////////////////////////////////////////////////////////////
+// ExecSessionRunner::JobTracker
- // Register as a client with the orchestrator (best-effort)
+void
+ExecSessionRunner::JobTracker::Insert(int LsnField)
+{
+ RwLock::ExclusiveLockScope _(Lock);
+ PendingJobs.insert(LsnField);
+}
- std::string OrchestratorClientId;
+bool
+ExecSessionRunner::JobTracker::IsEmpty() const
+{
+ RwLock::SharedLockScope _(Lock);
+ return PendingJobs.empty();
+}
- if (!m_OrchestratorUrl.empty())
+void
+ExecSessionRunner::JobTracker::Remove(int CompleteLsn)
+{
+ RwLock::ExclusiveLockScope _(Lock);
+ PendingJobs.erase(CompleteLsn);
+}
+
+size_t
+ExecSessionRunner::JobTracker::GetSize() const
+{
+ RwLock::SharedLockScope _(Lock);
+ return PendingJobs.size();
+}
+
+//////////////////////////////////////////////////////////////////////////
+// ExecSessionRunner implementation
+
+ExecSessionRunner::ExecSessionRunner(zen::compute::ComputeServiceSession& Session, const ExecSessionConfig& Config)
+: m_Session(Session)
+, m_Config(Config)
+{
+}
+
+std::string
+ExecSessionRunner::RegisterOrchestratorClient()
+{
+ if (m_Config.OrchestratorUrl.empty())
{
- try
- {
- HttpClient OrchestratorClient(m_OrchestratorUrl);
+ return {};
+ }
- CbObjectWriter Ann;
- Ann << "session_id"sv << GetSessionId();
- Ann << "hostname"sv << std::string_view(GetMachineName());
+ try
+ {
+ HttpClient OrchestratorClient(m_Config.OrchestratorUrl);
- CbObjectWriter Meta;
- Meta << "source"sv
- << "zen-exec"sv;
- Ann << "metadata"sv << Meta.Save();
+ CbObjectWriter Ann;
+ Ann << "session_id"sv << GetSessionId();
+ Ann << "hostname"sv << std::string_view(GetMachineName());
- auto Resp = OrchestratorClient.Post("/orch/clients", Ann.Save());
- if (Resp.IsSuccess())
- {
- OrchestratorClientId = std::string(Resp.AsObject()["id"].AsString());
- ZEN_CONSOLE_INFO("registered with orchestrator as {}", OrchestratorClientId);
- }
- else
- {
- ZEN_WARN("failed to register with orchestrator (status {})", static_cast<int>(Resp.StatusCode));
- }
+ CbObjectWriter Meta;
+ Meta << "source"sv
+ << "zen-exec"sv;
+ Ann << "metadata"sv << Meta.Save();
+
+ auto Resp = OrchestratorClient.Post("/orch/clients", Ann.Save());
+ if (Resp.IsSuccess())
+ {
+ std::string ClientId = std::string(Resp.AsObject()["id"].AsString());
+ ZEN_CONSOLE_INFO("registered with orchestrator as {}", ClientId);
+ return ClientId;
}
- catch (const std::exception& Ex)
+ else
{
- ZEN_WARN("failed to register with orchestrator: {}", Ex.what());
+ ZEN_WARN("failed to register with orchestrator (status {})", static_cast<int>(Resp.StatusCode));
}
}
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("failed to register with orchestrator: {}", Ex.what());
+ }
- Stopwatch OrchestratorHeartbeatTimer;
+ return {};
+}
- auto SendOrchestratorHeartbeat = [&] {
- if (OrchestratorClientId.empty() || OrchestratorHeartbeatTimer.GetElapsedTimeMs() < 30'000)
- {
- return;
- }
- OrchestratorHeartbeatTimer.Reset();
+void
+ExecSessionRunner::SendOrchestratorHeartbeat()
+{
+ if (m_OrchestratorClientId.empty() || m_OrchestratorHeartbeatTimer.GetElapsedTimeMs() < 30'000)
+ {
+ return;
+ }
+ m_OrchestratorHeartbeatTimer.Reset();
+ try
+ {
+ HttpClient OrchestratorClient(m_Config.OrchestratorUrl);
+ std::ignore = OrchestratorClient.Post(fmt::format("/orch/clients/{}/update", m_OrchestratorClientId));
+ }
+ catch (...)
+ {
+ }
+}
+
+void
+ExecSessionRunner::CompleteOrchestratorClient()
+{
+ if (!m_OrchestratorClientId.empty())
+ {
try
{
- HttpClient OrchestratorClient(m_OrchestratorUrl);
- std::ignore = OrchestratorClient.Post(fmt::format("/orch/clients/{}/update", OrchestratorClientId));
+ HttpClient OrchestratorClient(m_Config.OrchestratorUrl);
+ std::ignore = OrchestratorClient.Post(fmt::format("/orch/clients/{}/complete", m_OrchestratorClientId));
}
catch (...)
{
}
- };
-
- auto ClientCleanup = MakeGuard([&] {
- if (!OrchestratorClientId.empty())
- {
- try
- {
- HttpClient OrchestratorClient(m_OrchestratorUrl);
- std::ignore = OrchestratorClient.Post(fmt::format("/orch/clients/{}/complete", OrchestratorClientId));
- }
- catch (...)
- {
- }
- }
- });
-
- // Create a queue to group all actions from this exec session
-
- CbObjectWriter Metadata;
- Metadata << "source"sv
- << "zen-exec"sv;
-
- auto QueueResult = ComputeSession.CreateQueue("zen-exec", Metadata.Save());
- const int QueueId = QueueResult.QueueId;
- if (!QueueId)
- {
- ZEN_ERROR("failed to create compute queue");
- return 1;
}
+}
- auto QueueCleanup = MakeGuard([&] { ComputeSession.DeleteQueue(QueueId); });
-
- if (!m_OutputPath.empty())
+void
+ExecSessionRunner::DrainCompletedJobs()
+{
+ if (m_IsDraining.exchange(1))
{
- zen::CreateDirectories(m_OutputPath);
+ return;
}
- std::atomic<int> IsDraining{0};
+ auto _ = MakeGuard([&] { m_IsDraining.store(0, std::memory_order_release); });
- auto DrainCompletedJobs = [&] {
- if (IsDraining.exchange(1))
- {
- return;
- }
-
- auto _ = MakeGuard([&] { IsDraining.store(0, std::memory_order_release); });
+ CbObjectWriter Cbo;
+ m_Session.GetQueueCompleted(m_QueueId, Cbo);
- CbObjectWriter Cbo;
- ComputeSession.GetQueueCompleted(QueueId, Cbo);
-
- if (CbObject Completed = Cbo.Save())
+ if (CbObject Completed = Cbo.Save())
+ {
+ for (auto& It : Completed["completed"sv])
{
- for (auto& It : Completed["completed"sv])
- {
- int32_t CompleteLsn = It.AsInt32();
+ int32_t CompleteLsn = It.AsInt32();
- CbPackage ResultPackage;
- HttpResponseCode Response = ComputeSession.GetActionResult(CompleteLsn, /* out */ ResultPackage);
+ CbPackage ResultPackage;
+ HttpResponseCode Response = m_Session.GetActionResult(CompleteLsn, /* out */ ResultPackage);
- if (Response == HttpResponseCode::OK)
+ if (Response == HttpResponseCode::OK)
+ {
+ if (!m_Config.OutputPath.empty() && ResultPackage)
{
- if (!m_OutputPath.empty() && ResultPackage)
- {
- int OutputAttachments = 0;
- uint64_t OutputBytes = 0;
-
- if (!m_Binary)
- {
- // Write the root object as YAML
- ExtendableStringBuilder<4096> YamlStr;
- CompactBinaryToYaml(ResultPackage.GetObject(), YamlStr);
-
- std::string_view Yaml = YamlStr;
- zen::WriteFile(m_OutputPath / fmt::format("{}.result.yaml", CompleteLsn),
- IoBuffer(IoBuffer::Clone, Yaml.data(), Yaml.size()));
-
- // Write decompressed attachments
- auto Attachments = ResultPackage.GetAttachments();
-
- if (!Attachments.empty())
- {
- std::filesystem::path AttDir = m_OutputPath / fmt::format("{}.result.attachments", CompleteLsn);
- zen::CreateDirectories(AttDir);
-
- for (const CbAttachment& Att : Attachments)
- {
- ++OutputAttachments;
-
- IoHash AttHash = Att.GetHash();
-
- if (Att.IsCompressedBinary())
- {
- SharedBuffer Decompressed = Att.AsCompressedBinary().Decompress();
- OutputBytes += Decompressed.GetSize();
- zen::WriteFile(AttDir / AttHash.ToHexString(),
- IoBuffer(IoBuffer::Clone, Decompressed.GetData(), Decompressed.GetSize()));
- }
- else
- {
- SharedBuffer Binary = Att.AsBinary();
- OutputBytes += Binary.GetSize();
- zen::WriteFile(AttDir / AttHash.ToHexString(),
- IoBuffer(IoBuffer::Clone, Binary.GetData(), Binary.GetSize()));
- }
- }
- }
-
- if (!m_QuietLogging)
- {
- ZEN_CONSOLE("saved result: {}/{}.result.yaml ({} attachments)",
- m_OutputPath.string(),
- CompleteLsn,
- OutputAttachments);
- }
- }
- else
- {
- CompositeBuffer Serialized = FormatPackageMessageBuffer(ResultPackage);
- zen::WriteFile(m_OutputPath / fmt::format("{}.result.pkg", CompleteLsn), std::move(Serialized));
-
- for (const CbAttachment& Att : ResultPackage.GetAttachments())
- {
- ++OutputAttachments;
- OutputBytes += Att.AsBinary().GetSize();
- }
-
- if (!m_QuietLogging)
- {
- ZEN_CONSOLE("saved result: {}/{}.result.pkg", m_OutputPath.string(), CompleteLsn);
- }
- }
-
- std::lock_guard Lock(SummaryLock);
- if (auto It2 = SummaryEntries.find(CompleteLsn); It2 != SummaryEntries.end())
- {
- It2->second.OutputAttachments = OutputAttachments;
- It2->second.OutputBytes = OutputBytes;
- }
- }
+ SaveResultOutput(CompleteLsn, ResultPackage);
+ }
- PendingJobs.Remove(CompleteLsn);
+ m_PendingJobs.Remove(CompleteLsn);
- ZEN_CONSOLE("completed: LSN {} ({} still pending)", CompleteLsn, PendingJobs.GetSize());
- }
+ ZEN_CONSOLE("completed: LSN {} ({} still pending)", CompleteLsn, m_PendingJobs.GetSize());
}
}
- };
-
- // Describe workers
+ }
+}
- ZEN_CONSOLE("describing {} workers", m_WorkerMap.size());
+void
+ExecSessionRunner::SaveResultOutput(int32_t CompleteLsn, CbPackage& ResultPackage)
+{
+ int OutputAttachments = 0;
+ uint64_t OutputBytes = 0;
- for (auto Kv : m_WorkerMap)
+ if (!m_Config.Binary)
{
- CbPackage WorkerDesc = Kv.second;
+ // Write the root object as YAML
+ ExtendableStringBuilder<4096> YamlStr;
+ CompactBinaryToYaml(ResultPackage.GetObject(), YamlStr);
- ComputeSession.RegisterWorker(WorkerDesc);
- }
+ std::string_view Yaml = YamlStr;
+ zen::WriteFile(m_Config.OutputPath / fmt::format("{}.result.yaml", CompleteLsn),
+ IoBuffer(IoBuffer::Clone, Yaml.data(), Yaml.size()));
- // Then submit work items
+ // Write decompressed attachments
+ auto Attachments = ResultPackage.GetAttachments();
- int FailedWorkCounter = 0;
- size_t RemainingWorkItems = m_RecordingReader->GetActionCount();
- int SubmittedWorkItems = 0;
-
- ZEN_CONSOLE("submitting {} work items", RemainingWorkItems);
+ if (!Attachments.empty())
+ {
+ std::filesystem::path AttDir = m_Config.OutputPath / fmt::format("{}.result.attachments", CompleteLsn);
+ zen::CreateDirectories(AttDir);
- int OffsetCounter = m_Offset;
- int StrideCounter = m_Stride;
+ for (const CbAttachment& Att : Attachments)
+ {
+ ++OutputAttachments;
- auto ShouldSchedule = [&]() -> bool {
- if (m_Limit && SubmittedWorkItems >= m_Limit)
- {
- // Limit reached, ignore
+ IoHash AttHash = Att.GetHash();
- return false;
+ if (Att.IsCompressedBinary())
+ {
+ SharedBuffer Decompressed = Att.AsCompressedBinary().Decompress();
+ OutputBytes += Decompressed.GetSize();
+ zen::WriteFile(AttDir / AttHash.ToHexString(),
+ IoBuffer(IoBuffer::Clone, Decompressed.GetData(), Decompressed.GetSize()));
+ }
+ else
+ {
+ SharedBuffer Binary = Att.AsBinary();
+ OutputBytes += Binary.GetSize();
+ zen::WriteFile(AttDir / AttHash.ToHexString(), IoBuffer(IoBuffer::Clone, Binary.GetData(), Binary.GetSize()));
+ }
+ }
}
- if (OffsetCounter && OffsetCounter--)
+ if (!m_Config.Quiet)
{
- // Still in offset, ignore
-
- return false;
+ ZEN_CONSOLE("saved result: {}/{}.result.yaml ({} attachments)", m_Config.OutputPath.string(), CompleteLsn, OutputAttachments);
}
+ }
+ else
+ {
+ CompositeBuffer Serialized = FormatPackageMessageBuffer(ResultPackage);
+ zen::WriteFile(m_Config.OutputPath / fmt::format("{}.result.pkg", CompleteLsn), std::move(Serialized));
- if (--StrideCounter == 0)
+ for (const CbAttachment& Att : ResultPackage.GetAttachments())
{
- StrideCounter = m_Stride;
-
- return true;
+ ++OutputAttachments;
+ OutputBytes += Att.AsBinary().GetSize();
}
- return false;
- };
-
- int TargetParallelism = 8;
+ if (!m_Config.Quiet)
+ {
+ ZEN_CONSOLE("saved result: {}/{}.result.pkg", m_Config.OutputPath.string(), CompleteLsn);
+ }
+ }
- if (OffsetCounter || StrideCounter || m_Limit)
+ std::lock_guard Lock(m_SummaryLock);
+ if (auto It2 = m_SummaryEntries.find(CompleteLsn); It2 != m_SummaryEntries.end())
{
- TargetParallelism = 1;
+ It2->second.OutputAttachments = OutputAttachments;
+ It2->second.OutputBytes = OutputBytes;
}
+}
- std::atomic<int> RecordingIndex{0};
+void
+ExecSessionRunner::SaveActionOutput(int32_t Lsn, int RecordingIndex, const IoHash& ActionId, const CbObject& ActionObject)
+{
+ ActionSummaryEntry Entry;
+ Entry.Lsn = Lsn;
+ Entry.RecordingIndex = RecordingIndex;
+ Entry.ActionId = ActionId;
+ Entry.FunctionName = std::string(ActionObject["Function"sv].AsString());
- m_RecordingReader->IterateActions(
- [&](CbObject ActionObject, const IoHash& ActionId) {
- // Enqueue job
+ if (!m_Config.Binary)
+ {
+ // Write action object as YAML
+ ExtendableStringBuilder<4096> YamlStr;
+ CompactBinaryToYaml(ActionObject, YamlStr);
- const int CurrentRecordingIndex = RecordingIndex++;
+ std::string_view Yaml = YamlStr;
+ zen::WriteFile(m_Config.OutputPath / fmt::format("{}.action.yaml", Lsn), IoBuffer(IoBuffer::Clone, Yaml.data(), Yaml.size()));
- Stopwatch SubmitTimer;
+ // Write decompressed input attachments
+ std::filesystem::path AttDir = m_Config.OutputPath / fmt::format("{}.action.attachments", Lsn);
+ bool AttDirCreated = false;
- const int Priority = 0;
+ ActionObject.IterateAttachments([&](CbFieldView Field) {
+ IoHash AttachCid = Field.AsAttachment();
+ ++Entry.InputAttachments;
- if (ShouldSchedule())
+ if (IoBuffer ChunkData = m_Config.Resolver.FindChunkByCid(AttachCid))
{
- if (m_VerboseLogging)
- {
- int AttachmentCount = 0;
- uint64_t AttachmentBytes = 0;
- eastl::hash_set<IoHash> ReferencedChunks;
+ IoHash RawHash;
+ uint64_t RawSize = 0;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(ChunkData), RawHash, RawSize);
+ SharedBuffer Decompressed = Compressed.Decompress();
- ActionObject.IterateAttachments([&](CbFieldView Field) {
- IoHash AttachData = Field.AsAttachment();
+ Entry.InputBytes += Decompressed.GetSize();
- ReferencedChunks.insert(AttachData);
- ++AttachmentCount;
-
- if (IoBuffer ChunkData = m_ChunkResolver->FindChunkByCid(AttachData))
- {
- AttachmentBytes += ChunkData.GetSize();
- }
- });
-
- zen::ExtendableStringBuilder<1024> ObjStr;
- zen::CompactBinaryToJson(ActionObject, ObjStr);
- ZEN_CONSOLE("work item {} ({} attachments, {} bytes): {}",
- ActionId,
- AttachmentCount,
- NiceBytes(AttachmentBytes),
- ObjStr);
- }
-
- if (m_DumpActions)
+ if (!AttDirCreated)
{
- int AttachmentCount = 0;
- uint64_t AttachmentBytes = 0;
-
- ActionObject.IterateAttachments([&](CbFieldView Field) {
- IoHash AttachData = Field.AsAttachment();
-
- ++AttachmentCount;
-
- if (IoBuffer ChunkData = m_ChunkResolver->FindChunkByCid(AttachData))
- {
- AttachmentBytes += ChunkData.GetSize();
- }
- });
-
- zen::ExtendableStringBuilder<1024> ObjStr;
- zen::CompactBinaryToYaml(ActionObject, ObjStr);
- ZEN_CONSOLE("action {} ({} attachments, {}):\n{}", ActionId, AttachmentCount, NiceBytes(AttachmentBytes), ObjStr);
+ zen::CreateDirectories(AttDir);
+ AttDirCreated = true;
}
- if (zen::compute::ComputeServiceSession::EnqueueResult EnqueueResult =
- ComputeSession.EnqueueActionToQueue(QueueId, ActionObject, Priority))
- {
- const int32_t LsnField = EnqueueResult.Lsn;
-
- --RemainingWorkItems;
- ++SubmittedWorkItems;
-
- if (!m_QuietLogging)
- {
- ZEN_CONSOLE("submitted work item #{} - LSN {} - {}. {} remaining",
- SubmittedWorkItems,
- LsnField,
- NiceTimeSpanMs(SubmitTimer.GetElapsedTimeMs()),
- RemainingWorkItems);
- }
-
- if (!m_OutputPath.empty())
- {
- ActionSummaryEntry Entry;
- Entry.Lsn = LsnField;
- Entry.RecordingIndex = CurrentRecordingIndex;
- Entry.ActionId = ActionId;
- Entry.FunctionName = std::string(ActionObject["Function"sv].AsString());
-
- if (!m_Binary)
- {
- // Write action object as YAML
- ExtendableStringBuilder<4096> YamlStr;
- CompactBinaryToYaml(ActionObject, YamlStr);
-
- std::string_view Yaml = YamlStr;
- zen::WriteFile(m_OutputPath / fmt::format("{}.action.yaml", LsnField),
- IoBuffer(IoBuffer::Clone, Yaml.data(), Yaml.size()));
-
- // Write decompressed input attachments
- std::filesystem::path AttDir = m_OutputPath / fmt::format("{}.action.attachments", LsnField);
- bool AttDirCreated = false;
-
- ActionObject.IterateAttachments([&](CbFieldView Field) {
- IoHash AttachCid = Field.AsAttachment();
- ++Entry.InputAttachments;
-
- if (IoBuffer ChunkData = m_ChunkResolver->FindChunkByCid(AttachCid))
- {
- IoHash RawHash;
- uint64_t RawSize = 0;
- CompressedBuffer Compressed =
- CompressedBuffer::FromCompressed(SharedBuffer(ChunkData), RawHash, RawSize);
- SharedBuffer Decompressed = Compressed.Decompress();
-
- Entry.InputBytes += Decompressed.GetSize();
-
- if (!AttDirCreated)
- {
- zen::CreateDirectories(AttDir);
- AttDirCreated = true;
- }
-
- zen::WriteFile(AttDir / AttachCid.ToHexString(),
- IoBuffer(IoBuffer::Clone, Decompressed.GetData(), Decompressed.GetSize()));
- }
- });
-
- if (!m_QuietLogging)
- {
- ZEN_CONSOLE("saved action: {}/{}.action.yaml ({} attachments)",
- m_OutputPath.string(),
- LsnField,
- Entry.InputAttachments);
- }
- }
- else
- {
- // Build a CbPackage from the action and write as .pkg
- CbPackage ActionPackage;
- ActionPackage.SetObject(ActionObject);
-
- ActionObject.IterateAttachments([&](CbFieldView Field) {
- IoHash AttachCid = Field.AsAttachment();
- ++Entry.InputAttachments;
-
- if (IoBuffer ChunkData = m_ChunkResolver->FindChunkByCid(AttachCid))
- {
- IoHash RawHash;
- uint64_t RawSize = 0;
- CompressedBuffer Compressed =
- CompressedBuffer::FromCompressed(SharedBuffer(ChunkData), RawHash, RawSize);
-
- Entry.InputBytes += ChunkData.GetSize();
- ActionPackage.AddAttachment(CbAttachment(std::move(Compressed), RawHash));
- }
- });
-
- CompositeBuffer Serialized = FormatPackageMessageBuffer(ActionPackage);
- zen::WriteFile(m_OutputPath / fmt::format("{}.action.pkg", LsnField), std::move(Serialized));
-
- if (!m_QuietLogging)
- {
- ZEN_CONSOLE("saved action: {}/{}.action.pkg", m_OutputPath.string(), LsnField);
- }
- }
-
- std::lock_guard Lock(SummaryLock);
- SummaryEntries.emplace(LsnField, std::move(Entry));
- }
+ zen::WriteFile(AttDir / AttachCid.ToHexString(), IoBuffer(IoBuffer::Clone, Decompressed.GetData(), Decompressed.GetSize()));
+ }
+ });
- PendingJobs.Insert(LsnField);
- }
- else
- {
- if (!m_QuietLogging)
- {
- std::string_view FunctionName = ActionObject["Function"sv].AsString();
- const Guid FunctionVersion = ActionObject["FunctionVersion"sv].AsUuid();
- const Guid BuildSystemVersion = ActionObject["BuildSystemVersion"sv].AsUuid();
+ if (!m_Config.Quiet)
+ {
+ ZEN_CONSOLE("saved action: {}/{}.action.yaml ({} attachments)", m_Config.OutputPath.string(), Lsn, Entry.InputAttachments);
+ }
+ }
+ else
+ {
+ // Build a CbPackage from the action and write as .pkg
+ CbPackage ActionPackage;
+ ActionPackage.SetObject(ActionObject);
- ZEN_ERROR(
- "failed to resolve function for work with (Function:{},FunctionVersion:{},BuildSystemVersion:{}). Work "
- "descriptor "
- "at: 'file://{}'",
- std::string(FunctionName),
- FunctionVersion,
- BuildSystemVersion,
- "<null>");
+ ActionObject.IterateAttachments([&](CbFieldView Field) {
+ IoHash AttachCid = Field.AsAttachment();
+ ++Entry.InputAttachments;
- EmitFunctionListOnce(m_FunctionList);
- }
+ if (IoBuffer ChunkData = m_Config.Resolver.FindChunkByCid(AttachCid))
+ {
+ IoHash RawHash;
+ uint64_t RawSize = 0;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(ChunkData), RawHash, RawSize);
- ++FailedWorkCounter;
- }
+ Entry.InputBytes += ChunkData.GetSize();
+ ActionPackage.AddAttachment(CbAttachment(std::move(Compressed), RawHash));
}
+ });
- // Check for completed work
+ CompositeBuffer Serialized = FormatPackageMessageBuffer(ActionPackage);
+ zen::WriteFile(m_Config.OutputPath / fmt::format("{}.action.pkg", Lsn), std::move(Serialized));
- DrainCompletedJobs();
- SendOrchestratorHeartbeat();
- },
- TargetParallelism);
+ if (!m_Config.Quiet)
+ {
+ ZEN_CONSOLE("saved action: {}/{}.action.pkg", m_Config.OutputPath.string(), Lsn);
+ }
+ }
- // Wait until all pending work is complete
+ std::lock_guard Lock(m_SummaryLock);
+ m_SummaryEntries.emplace(Lsn, std::move(Entry));
+}
- while (!PendingJobs.IsEmpty())
+void
+ExecSessionRunner::WriteSummaryFiles()
+{
+ if (m_Config.OutputPath.empty() || m_SummaryEntries.empty())
{
- // TODO: improve this logic
- zen::Sleep(500);
-
- DrainCompletedJobs();
- SendOrchestratorHeartbeat();
+ return;
}
// Merge timing data from queue history into summary entries
- if (!SummaryEntries.empty())
+ // RunnerAction::State indices (can't include functionrunner.h from here)
+ constexpr int kStateNew = 0;
+ constexpr int kStatePending = 1;
+ constexpr int kStateRunning = 3;
+ constexpr int kStateCompleted = 4; // first terminal state
+ constexpr int kStateCount = 8;
+
+ for (const auto& HistEntry : m_Session.GetQueueHistory(m_QueueId, 0))
{
- // RunnerAction::State indices (can't include functionrunner.h from here)
- constexpr int kStateNew = 0;
- constexpr int kStatePending = 1;
- constexpr int kStateRunning = 3;
- constexpr int kStateCompleted = 4; // first terminal state
- constexpr int kStateCount = 8;
-
- for (const auto& HistEntry : ComputeSession.GetQueueHistory(QueueId, 0))
+ std::lock_guard Lock(m_SummaryLock);
+ if (auto It = m_SummaryEntries.find(HistEntry.Lsn); It != m_SummaryEntries.end())
{
- std::lock_guard Lock(SummaryLock);
- if (auto It = SummaryEntries.find(HistEntry.Lsn); It != SummaryEntries.end())
+ // Find terminal state timestamp (Completed, Failed, Abandoned, or Cancelled)
+ uint64_t EndTick = 0;
+ for (int S = kStateCompleted; S < kStateCount; ++S)
{
- // Find terminal state timestamp (Completed, Failed, Abandoned, or Cancelled)
- uint64_t EndTick = 0;
- for (int S = kStateCompleted; S < kStateCount; ++S)
+ if (HistEntry.Timestamps[S] != 0)
{
- if (HistEntry.Timestamps[S] != 0)
- {
- EndTick = HistEntry.Timestamps[S];
- break;
- }
+ EndTick = HistEntry.Timestamps[S];
+ break;
}
- uint64_t StartTick = HistEntry.Timestamps[kStateNew];
- if (EndTick > StartTick)
- {
- It->second.WallSeconds = float(double(EndTick - StartTick) / double(TimeSpan::TicksPerSecond));
- }
- It->second.CpuSeconds = HistEntry.CpuSeconds;
- It->second.SubmittedTicks = HistEntry.Timestamps[kStatePending];
- It->second.StartedTicks = HistEntry.Timestamps[kStateRunning];
- It->second.ExecutionLocation = HistEntry.ExecutionLocation;
}
+ uint64_t StartTick = HistEntry.Timestamps[kStateNew];
+ if (EndTick > StartTick)
+ {
+ It->second.WallSeconds = float(double(EndTick - StartTick) / double(TimeSpan::TicksPerSecond));
+ }
+ It->second.CpuSeconds = HistEntry.CpuSeconds;
+ It->second.SubmittedTicks = HistEntry.Timestamps[kStatePending];
+ It->second.StartedTicks = HistEntry.Timestamps[kStateRunning];
+ It->second.ExecutionLocation = HistEntry.ExecutionLocation;
}
}
- // Write summary file if output path is set
+ // Sort entries by recording index
- if (!m_OutputPath.empty() && !SummaryEntries.empty())
+ std::vector<ActionSummaryEntry> Sorted;
+ Sorted.reserve(m_SummaryEntries.size());
+ for (auto& [_, Entry] : m_SummaryEntries)
{
- std::vector<ActionSummaryEntry> Sorted;
- Sorted.reserve(SummaryEntries.size());
- for (auto& [_, Entry] : SummaryEntries)
- {
- Sorted.push_back(std::move(Entry));
- }
-
- std::sort(Sorted.begin(), Sorted.end(), [](const ActionSummaryEntry& A, const ActionSummaryEntry& B) {
- return A.RecordingIndex < B.RecordingIndex;
- });
+ Sorted.push_back(std::move(Entry));
+ }
- auto FormatTimestamp = [](uint64_t Ticks) -> std::string {
- if (Ticks == 0)
- {
- return "-";
- }
- return DateTime(Ticks).ToString("%H:%M:%S.%s");
- };
-
- ExtendableStringBuilder<4096> Summary;
- Summary.Append(fmt::format("{:<8} {:<8} {:<40} {:<40} {:>8} {:>12} {:>8} {:>12} {:>8} {:>8} {:>12} {:>12} {:<24}\n",
- "LSN",
- "Index",
- "ActionId",
- "Function",
- "InAtt",
- "InBytes",
- "OutAtt",
- "OutBytes",
- "Wall(s)",
- "CPU(s)",
- "Submitted",
- "Started",
- "Location"));
- Summary.Append(fmt::format("{:-<8} {:-<8} {:-<40} {:-<40} {:-<8} {:-<12} {:-<8} {:-<12} {:-<8} {:-<8} {:-<12} {:-<12} {:-<24}\n",
- "",
- "",
- "",
- "",
- "",
- "",
- "",
- "",
- "",
- "",
- "",
- "",
- ""));
+ std::sort(Sorted.begin(), Sorted.end(), [](const ActionSummaryEntry& A, const ActionSummaryEntry& B) {
+ return A.RecordingIndex < B.RecordingIndex;
+ });
- for (const ActionSummaryEntry& Entry : Sorted)
+ auto FormatTimestamp = [](uint64_t Ticks) -> std::string {
+ if (Ticks == 0)
{
- Summary.Append(fmt::format("{:<8} {:<8} {:<40} {:<40} {:>8} {:>12} {:>8} {:>12} {:>8.2f} {:>8.2f} {:>12} {:>12} {:<24}\n",
- Entry.Lsn,
- Entry.RecordingIndex,
- Entry.ActionId,
- Entry.FunctionName,
- Entry.InputAttachments,
- NiceBytes(Entry.InputBytes),
- Entry.OutputAttachments,
- NiceBytes(Entry.OutputBytes),
- Entry.WallSeconds,
- Entry.CpuSeconds,
- FormatTimestamp(Entry.SubmittedTicks),
- FormatTimestamp(Entry.StartedTicks),
- Entry.ExecutionLocation));
+ return "-";
}
+ return DateTime(Ticks).ToString("%H:%M:%S.%s");
+ };
- std::filesystem::path SummaryPath = m_OutputPath / "summary.txt";
- std::string_view SummaryStr = Summary;
- zen::WriteFile(SummaryPath, IoBuffer(IoBuffer::Clone, SummaryStr.data(), SummaryStr.size()));
+ // Write summary.txt
+
+ ExtendableStringBuilder<4096> Summary;
+ Summary.Append(fmt::format("{:<8} {:<8} {:<40} {:<40} {:>8} {:>12} {:>8} {:>12} {:>8} {:>8} {:>12} {:>12} {:<24}\n",
+ "LSN",
+ "Index",
+ "ActionId",
+ "Function",
+ "InAtt",
+ "InBytes",
+ "OutAtt",
+ "OutBytes",
+ "Wall(s)",
+ "CPU(s)",
+ "Submitted",
+ "Started",
+ "Location"));
+ Summary.Append(fmt::format("{:-<8} {:-<8} {:-<40} {:-<40} {:-<8} {:-<12} {:-<8} {:-<12} {:-<8} {:-<8} {:-<12} {:-<12} {:-<24}\n",
+ "",
+ "",
+ "",
+ "",
+ "",
+ "",
+ "",
+ "",
+ "",
+ "",
+ "",
+ "",
+ ""));
+
+ for (const ActionSummaryEntry& Entry : Sorted)
+ {
+ Summary.Append(fmt::format("{:<8} {:<8} {:<40} {:<40} {:>8} {:>12} {:>8} {:>12} {:>8.2f} {:>8.2f} {:>12} {:>12} {:<24}\n",
+ Entry.Lsn,
+ Entry.RecordingIndex,
+ Entry.ActionId,
+ Entry.FunctionName,
+ Entry.InputAttachments,
+ NiceBytes(Entry.InputBytes),
+ Entry.OutputAttachments,
+ NiceBytes(Entry.OutputBytes),
+ Entry.WallSeconds,
+ Entry.CpuSeconds,
+ FormatTimestamp(Entry.SubmittedTicks),
+ FormatTimestamp(Entry.StartedTicks),
+ Entry.ExecutionLocation));
+ }
- ZEN_CONSOLE("wrote summary to {}", SummaryPath.string());
+ std::filesystem::path SummaryPath = m_Config.OutputPath / "summary.txt";
+ std::string_view SummaryStr = Summary;
+ zen::WriteFile(SummaryPath, IoBuffer(IoBuffer::Clone, SummaryStr.data(), SummaryStr.size()));
- if (!m_Binary)
- {
- auto EscapeHtml = [](std::string_view Input) -> std::string {
- std::string Out;
- Out.reserve(Input.size());
- for (char C : Input)
- {
- switch (C)
- {
- case '&':
- Out += "&amp;";
- break;
- case '<':
- Out += "&lt;";
- break;
- case '>':
- Out += "&gt;";
- break;
- case '"':
- Out += "&quot;";
- break;
- case '\'':
- Out += "&#39;";
- break;
- default:
- Out += C;
- }
- }
- return Out;
- };
+ ZEN_CONSOLE("wrote summary to {}", SummaryPath.string());
- auto EscapeJson = [](std::string_view Input) -> std::string {
- std::string Out;
- Out.reserve(Input.size());
- for (char C : Input)
- {
- switch (C)
- {
- case '"':
- Out += "\\\"";
- break;
- case '\\':
- Out += "\\\\";
- break;
- case '\n':
- Out += "\\n";
- break;
- case '\r':
- Out += "\\r";
- break;
- case '\t':
- Out += "\\t";
- break;
- default:
- if (static_cast<unsigned char>(C) < 0x20)
- {
- Out += fmt::format("\\u{:04x}", static_cast<unsigned>(static_cast<unsigned char>(C)));
- }
- else
- {
- Out += C;
- }
- }
- }
- return Out;
- };
+ // Write summary.html
- ExtendableStringBuilder<8192> Html;
+ if (!m_Config.Binary)
+ {
+ ExtendableStringBuilder<8192> Html;
- Html.Append(std::string_view(R"(<!DOCTYPE html>
+ Html.Append(std::string_view(R"(<!DOCTYPE html>
<html><head><meta charset="utf-8"><title>Exec Summary</title>
<style>
body{font-family:system-ui,sans-serif;margin:20px;background:#fafafa}
@@ -1007,51 +682,50 @@ a:hover{text-decoration:underline}
const DATA=[
)"));
- std::string_view ResultExt = ".result.yaml";
- std::string_view ActionExt = ".action.yaml";
+ std::string_view ResultExt = ".result.yaml";
+ std::string_view ActionExt = ".action.yaml";
- for (const ActionSummaryEntry& Entry : Sorted)
+ for (const ActionSummaryEntry& Entry : Sorted)
+ {
+ std::string SafeName = EscapeJson(EscapeHtml(Entry.FunctionName));
+ std::string ActionIdStr = Entry.ActionId.ToHexString();
+ std::string ActionLink;
+ if (!ActionExt.empty())
{
- std::string SafeName = EscapeJson(EscapeHtml(Entry.FunctionName));
- std::string ActionIdStr = Entry.ActionId.ToHexString();
- std::string ActionLink;
- if (!ActionExt.empty())
- {
- ActionLink = EscapeJson(fmt::format(" <a href=\"{}{}\">[action]</a>", Entry.Lsn, ActionExt));
- }
-
- // Indices: 0=lsn, 1=idx, 2=actionId, 3=fn, 4=inAtt, 5=inBytes, 6=outAtt, 7=outBytes,
- // 8=wall, 9=cpu, 10=niceBytesIn, 11=niceBytesOut, 12=actionLink,
- // 13=submittedTicks, 14=startedTicks, 15=submittedDisplay, 16=startedDisplay,
- // 17=location
- Html.Append(
- fmt::format("[{},{},\"{}\",\"{}\",{},{},{},{},{:.6f},{:.6f},\"{}\",\"{}\",\"{}\",{},{},\"{}\",\"{}\",\"{}\"],\n",
- Entry.Lsn,
- Entry.RecordingIndex,
- ActionIdStr,
- SafeName,
- Entry.InputAttachments,
- Entry.InputBytes,
- Entry.OutputAttachments,
- Entry.OutputBytes,
- Entry.WallSeconds,
- Entry.CpuSeconds,
- EscapeJson(NiceBytes(Entry.InputBytes)),
- EscapeJson(NiceBytes(Entry.OutputBytes)),
- ActionLink,
- Entry.SubmittedTicks,
- Entry.StartedTicks,
- FormatTimestamp(Entry.SubmittedTicks),
- FormatTimestamp(Entry.StartedTicks),
- EscapeJson(EscapeHtml(Entry.ExecutionLocation))));
+ ActionLink = EscapeJson(fmt::format(" <a href=\"{}{}\">[action]</a>", Entry.Lsn, ActionExt));
}
- Html.Append(fmt::format(R"(];
+ // Indices: 0=lsn, 1=idx, 2=actionId, 3=fn, 4=inAtt, 5=inBytes, 6=outAtt, 7=outBytes,
+ // 8=wall, 9=cpu, 10=niceBytesIn, 11=niceBytesOut, 12=actionLink,
+ // 13=submittedTicks, 14=startedTicks, 15=submittedDisplay, 16=startedDisplay,
+ // 17=location
+ Html.Append(fmt::format("[{},{},\"{}\",\"{}\",{},{},{},{},{:.6f},{:.6f},\"{}\",\"{}\",\"{}\",{},{},\"{}\",\"{}\",\"{}\"],\n",
+ Entry.Lsn,
+ Entry.RecordingIndex,
+ ActionIdStr,
+ SafeName,
+ Entry.InputAttachments,
+ Entry.InputBytes,
+ Entry.OutputAttachments,
+ Entry.OutputBytes,
+ Entry.WallSeconds,
+ Entry.CpuSeconds,
+ EscapeJson(NiceBytes(Entry.InputBytes)),
+ EscapeJson(NiceBytes(Entry.OutputBytes)),
+ ActionLink,
+ Entry.SubmittedTicks,
+ Entry.StartedTicks,
+ FormatTimestamp(Entry.SubmittedTicks),
+ FormatTimestamp(Entry.StartedTicks),
+ EscapeJson(EscapeHtml(Entry.ExecutionLocation))));
+ }
+
+ Html.Append(fmt::format(R"(];
const RESULT_EXT="{}";
)",
- ResultExt));
+ ResultExt));
- Html.Append(std::string_view(R"JS((function(){
+ Html.Append(std::string_view(R"JS((function(){
const ROW_H=33,BUF=20;
const container=document.getElementById("container");
const tbody=container.querySelector("tbody");
@@ -1158,14 +832,244 @@ document.getElementById("csvBtn").addEventListener("click",()=>{
</script></body></html>
)JS"));
- std::filesystem::path HtmlPath = m_OutputPath / "summary.html";
- std::string_view HtmlStr = Html;
- zen::WriteFile(HtmlPath, IoBuffer(IoBuffer::Clone, HtmlStr.data(), HtmlStr.size()));
+ std::filesystem::path HtmlPath = m_Config.OutputPath / "summary.html";
+ std::string_view HtmlStr = Html;
+ zen::WriteFile(HtmlPath, IoBuffer(IoBuffer::Clone, HtmlStr.data(), HtmlStr.size()));
+
+ ZEN_CONSOLE("wrote HTML summary to {}", HtmlPath.string());
+ }
+}
+
+void
+ExecSessionRunner::EmitFunctionListOnce()
+{
+ if (!m_FunctionListEmittedOnce)
+ {
+ ExecCommand::EmitFunctionList(m_Config.FunctionList);
+ m_FunctionListEmittedOnce = true;
+ }
+}
+
+int
+ExecSessionRunner::Run()
+{
+ m_Session.WaitUntilReady();
+
+ // Register as a client with the orchestrator (best-effort)
+
+ m_OrchestratorClientId = RegisterOrchestratorClient();
+
+ auto ClientCleanup = MakeGuard([&] { CompleteOrchestratorClient(); });
+
+ // Create a queue to group all actions from this exec session
+
+ CbObjectWriter Metadata;
+ Metadata << "source"sv
+ << "zen-exec"sv;
+
+ auto QueueResult = m_Session.CreateQueue("zen-exec", Metadata.Save());
+ const int QueueId = QueueResult.QueueId;
+ if (!QueueId)
+ {
+ ZEN_ERROR("failed to create compute queue");
+ return 1;
+ }
+
+ m_QueueId = QueueId;
+
+ auto QueueCleanup = MakeGuard([&] { m_Session.DeleteQueue(QueueId); });
+
+ if (!m_Config.OutputPath.empty())
+ {
+ zen::CreateDirectories(m_Config.OutputPath);
+ }
+
+ // Describe workers
+
+ ZEN_CONSOLE("describing {} workers", m_Config.WorkerMap.size());
+
+ for (auto Kv : m_Config.WorkerMap)
+ {
+ CbPackage WorkerDesc = Kv.second;
+
+ m_Session.RegisterWorker(WorkerDesc);
+ }
+
+ // Then submit work items
+
+ int FailedWorkCounter = 0;
+ size_t RemainingWorkItems = m_Config.RecordingReader.GetActionCount();
+ int SubmittedWorkItems = 0;
+
+ ZEN_CONSOLE("submitting {} work items", RemainingWorkItems);
+
+ int OffsetCounter = m_Config.Offset;
+ int StrideCounter = m_Config.Stride;
- ZEN_CONSOLE("wrote HTML summary to {}", HtmlPath.string());
+ auto ShouldSchedule = [&]() -> bool {
+ if (m_Config.Limit && SubmittedWorkItems >= m_Config.Limit)
+ {
+ // Limit reached, ignore
+
+ return false;
+ }
+
+ if (OffsetCounter && OffsetCounter--)
+ {
+ // Still in offset, ignore
+
+ return false;
}
+
+ if (--StrideCounter == 0)
+ {
+ StrideCounter = m_Config.Stride;
+
+ return true;
+ }
+
+ return false;
+ };
+
+ int TargetParallelism = 8;
+
+ if (OffsetCounter || StrideCounter || m_Config.Limit)
+ {
+ TargetParallelism = 1;
+ }
+
+ std::atomic<int> RecordingIndex{0};
+
+ m_Config.RecordingReader.IterateActions(
+ [&](CbObject ActionObject, const IoHash& ActionId) {
+ // Enqueue job
+
+ const int CurrentRecordingIndex = RecordingIndex++;
+
+ Stopwatch SubmitTimer;
+
+ const int Priority = 0;
+
+ if (ShouldSchedule())
+ {
+ if (m_Config.Verbose)
+ {
+ int AttachmentCount = 0;
+ uint64_t AttachmentBytes = 0;
+ eastl::hash_set<IoHash> ReferencedChunks;
+
+ ActionObject.IterateAttachments([&](CbFieldView Field) {
+ IoHash AttachData = Field.AsAttachment();
+
+ ReferencedChunks.insert(AttachData);
+ ++AttachmentCount;
+
+ if (IoBuffer ChunkData = m_Config.Resolver.FindChunkByCid(AttachData))
+ {
+ AttachmentBytes += ChunkData.GetSize();
+ }
+ });
+
+ zen::ExtendableStringBuilder<1024> ObjStr;
+ zen::CompactBinaryToJson(ActionObject, ObjStr);
+ ZEN_CONSOLE("work item {} ({} attachments, {} bytes): {}",
+ ActionId,
+ AttachmentCount,
+ NiceBytes(AttachmentBytes),
+ ObjStr);
+ }
+
+ if (m_Config.DumpActions)
+ {
+ int AttachmentCount = 0;
+ uint64_t AttachmentBytes = 0;
+
+ ActionObject.IterateAttachments([&](CbFieldView Field) {
+ IoHash AttachData = Field.AsAttachment();
+
+ ++AttachmentCount;
+
+ if (IoBuffer ChunkData = m_Config.Resolver.FindChunkByCid(AttachData))
+ {
+ AttachmentBytes += ChunkData.GetSize();
+ }
+ });
+
+ zen::ExtendableStringBuilder<1024> ObjStr;
+ zen::CompactBinaryToYaml(ActionObject, ObjStr);
+ ZEN_CONSOLE("action {} ({} attachments, {}):\n{}", ActionId, AttachmentCount, NiceBytes(AttachmentBytes), ObjStr);
+ }
+
+ if (zen::compute::ComputeServiceSession::EnqueueResult EnqueueResult =
+ m_Session.EnqueueActionToQueue(QueueId, ActionObject, Priority))
+ {
+ const int32_t LsnField = EnqueueResult.Lsn;
+
+ --RemainingWorkItems;
+ ++SubmittedWorkItems;
+
+ if (!m_Config.Quiet)
+ {
+ ZEN_CONSOLE("submitted work item #{} - LSN {} - {}. {} remaining",
+ SubmittedWorkItems,
+ LsnField,
+ NiceTimeSpanMs(SubmitTimer.GetElapsedTimeMs()),
+ RemainingWorkItems);
+ }
+
+ if (!m_Config.OutputPath.empty())
+ {
+ SaveActionOutput(LsnField, CurrentRecordingIndex, ActionId, ActionObject);
+ }
+
+ m_PendingJobs.Insert(LsnField);
+ }
+ else
+ {
+ if (!m_Config.Quiet)
+ {
+ std::string_view FunctionName = ActionObject["Function"sv].AsString();
+ const Guid FunctionVersion = ActionObject["FunctionVersion"sv].AsUuid();
+ const Guid BuildSystemVersion = ActionObject["BuildSystemVersion"sv].AsUuid();
+
+ ZEN_ERROR(
+ "failed to resolve function for work with (Function:{},FunctionVersion:{},BuildSystemVersion:{}). Work "
+ "descriptor "
+ "at: 'file://{}'",
+ std::string(FunctionName),
+ FunctionVersion,
+ BuildSystemVersion,
+ "<null>");
+
+ EmitFunctionListOnce();
+ }
+
+ ++FailedWorkCounter;
+ }
+ }
+
+ // Check for completed work
+
+ DrainCompletedJobs();
+ SendOrchestratorHeartbeat();
+ },
+ TargetParallelism);
+
+ // Wait until all pending work is complete
+
+ while (!m_PendingJobs.IsEmpty())
+ {
+ // TODO: improve this logic
+ zen::Sleep(500);
+
+ DrainCompletedJobs();
+ SendOrchestratorHeartbeat();
}
+ // Write summary files
+
+ WriteSummaryFiles();
+
if (FailedWorkCounter)
{
return 1;
@@ -1174,37 +1078,91 @@ document.getElementById("csvBtn").addEventListener("click",()=>{
return 0;
}
-int
-ExecCommand::LocalMessagingExecute()
+//////////////////////////////////////////////////////////////////////////
+// ExecHttpSubCmd
+
+ExecHttpSubCmd::ExecHttpSubCmd(ExecCommand& Parent) : ZenSubCmdBase("http", "Forward requests to HTTP compute service"), m_Parent(Parent)
{
- // Non-HTTP work submission path
+ SubOptions().add_option("", "u", "hosturl", ZenCmdBase::kHostUrlHelp, cxxopts::value(m_HostName), "<hosturl>");
+}
+
+void
+ExecHttpSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/)
+{
+ m_HostName = ZenCmdBase::ResolveTargetHostSpec(m_HostName);
- // To be reimplemented using final transport
+ ZEN_ASSERT(m_Parent.m_ChunkResolver);
+ ChunkResolver& Resolver = *m_Parent.m_ChunkResolver;
- return 0;
+ std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp");
+
+ zen::compute::ComputeServiceSession ComputeSession(Resolver);
+ ComputeSession.AddRemoteRunner(Resolver, TempPath, m_HostName);
+
+ Stopwatch ExecTimer;
+ int ReturnValue = m_Parent.RunSession(ComputeSession);
+
+ ZEN_CONSOLE("complete - took {}", NiceTimeSpanMs(ExecTimer.GetElapsedTimeMs()));
+
+ if (!ReturnValue)
+ {
+ ZEN_CONSOLE("all work items completed successfully");
+ }
+ else
+ {
+ ZEN_CONSOLE("some work items failed (code {})", ReturnValue);
+ }
}
//////////////////////////////////////////////////////////////////////////
+// ExecInprocSubCmd
-int
-ExecCommand::HttpExecute()
+ExecInprocSubCmd::ExecInprocSubCmd(ExecCommand& Parent) : ZenSubCmdBase("inproc", "Handle execution in-process"), m_Parent(Parent)
{
- ZEN_ASSERT(m_ChunkResolver);
- ChunkResolver& Resolver = *m_ChunkResolver;
+}
- std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp");
+void
+ExecInprocSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/)
+{
+ ZEN_ASSERT(m_Parent.m_ChunkResolver);
+ ChunkResolver& Resolver = *m_Parent.m_ChunkResolver;
zen::compute::ComputeServiceSession ComputeSession(Resolver);
- ComputeSession.AddRemoteRunner(Resolver, TempPath, m_HostName);
- return ExecUsingSession(ComputeSession);
+ std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp");
+ ComputeSession.AddLocalRunner(Resolver, TempPath);
+
+ Stopwatch ExecTimer;
+ int ReturnValue = m_Parent.RunSession(ComputeSession);
+
+ ZEN_CONSOLE("complete - took {}", NiceTimeSpanMs(ExecTimer.GetElapsedTimeMs()));
+
+ if (!ReturnValue)
+ {
+ ZEN_CONSOLE("all work items completed successfully");
+ }
+ else
+ {
+ ZEN_CONSOLE("some work items failed (code {})", ReturnValue);
+ }
}
-int
-ExecCommand::BeaconExecute()
+//////////////////////////////////////////////////////////////////////////
+// ExecBeaconSubCmd
+
+ExecBeaconSubCmd::ExecBeaconSubCmd(ExecCommand& Parent)
+: ZenSubCmdBase("beacon", "Execute via beacon/orchestrator discovery")
+, m_Parent(Parent)
{
- ZEN_ASSERT(m_ChunkResolver);
- ChunkResolver& Resolver = *m_ChunkResolver;
+ SubOptions().add_option("", "", "orch", "Orchestrator URL for worker discovery", cxxopts::value(m_OrchestratorUrl), "<url>");
+ SubOptions().add_option("", "", "beacon", "Beacon path", cxxopts::value(m_BeaconPath), "<path>");
+}
+
+void
+ExecBeaconSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/)
+{
+ ZEN_ASSERT(m_Parent.m_ChunkResolver);
+ ChunkResolver& Resolver = *m_Parent.m_ChunkResolver;
std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp");
zen::compute::ComputeServiceSession ComputeSession(Resolver);
@@ -1221,49 +1179,36 @@ ExecCommand::BeaconExecute()
ComputeSession.AddRemoteRunner(Resolver, TempPath, "http://localhost:8558");
}
- return ExecUsingSession(ComputeSession);
-}
-
-//////////////////////////////////////////////////////////////////////////
+ Stopwatch ExecTimer;
+ int ReturnValue = m_Parent.RunSession(ComputeSession, m_OrchestratorUrl);
-void
-ExecCommand::RegisterWorkerFunctionsFromDescription(const CbObject& WorkerDesc, const IoHash& WorkerId)
-{
- const Guid WorkerBuildSystemVersion = WorkerDesc["buildsystem_version"sv].AsUuid();
+ ZEN_CONSOLE("complete - took {}", NiceTimeSpanMs(ExecTimer.GetElapsedTimeMs()));
- for (auto& Item : WorkerDesc["functions"sv])
+ if (!ReturnValue)
{
- CbObjectView Function = Item.AsObjectView();
-
- std::string_view FunctionName = Function["name"sv].AsString();
- const Guid FunctionVersion = Function["version"sv].AsUuid();
-
- m_FunctionList.emplace_back(FunctionDefinition{.FunctionName = std::string{FunctionName},
- .FunctionVersion = FunctionVersion,
- .BuildSystemVersion = WorkerBuildSystemVersion,
- .WorkerId = WorkerId});
+ ZEN_CONSOLE("all work items completed successfully");
+ }
+ else
+ {
+ ZEN_CONSOLE("some work items failed (code {})", ReturnValue);
}
}
-void
-ExecCommand::EmitFunctionListOnce(const std::vector<FunctionDefinition>& FunctionList)
-{
- if (m_FunctionListEmittedOnce == false)
- {
- EmitFunctionList(FunctionList);
+//////////////////////////////////////////////////////////////////////////
+// ExecDumpSubCmd
- m_FunctionListEmittedOnce = true;
- }
+ExecDumpSubCmd::ExecDumpSubCmd(ExecCommand& Parent) : ZenSubCmdBase("dump", "Dump high level information about actions"), m_Parent(Parent)
+{
}
-int
-ExecCommand::DumpWorkItems()
+void
+ExecDumpSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/)
{
std::atomic<int> EmittedCount{0};
eastl::hash_map<IoHash, uint64_t> SeenAttachments; // Attachment CID -> count of references
- m_RecordingReader->IterateActions(
+ m_Parent.m_RecordingReader->IterateActions(
[&](CbObject ActionObject, const IoHash& ActionId) {
eastl::hash_map<IoHash, CompressedBuffer> Attachments;
@@ -1272,7 +1217,7 @@ ExecCommand::DumpWorkItems()
ActionObject.IterateAttachments([&](const zen::CbFieldView AttachmentField) {
const IoHash AttachmentCid = AttachmentField.GetValue().AsHash();
- IoBuffer AttachmentData = m_ChunkResolver->FindChunkByCid(AttachmentCid);
+ IoBuffer AttachmentData = m_Parent.m_ChunkResolver->FindChunkByCid(AttachmentCid);
IoHash RawHash;
uint64_t RawSize = 0;
CompressedBuffer CompressedData = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentData), RawHash, RawSize);
@@ -1322,36 +1267,191 @@ ExecCommand::DumpWorkItems()
{
ZEN_CONSOLE("{} attachments with {} references", Cids.size(), RefCount);
}
+}
- return 0;
+//////////////////////////////////////////////////////////////////////////
+// ExecBuildlogSubCmd
+
+ExecBuildlogSubCmd::ExecBuildlogSubCmd(ExecCommand& Parent) : ZenSubCmdBase("buildlog", "Generate build actions log"), m_Parent(Parent)
+{
+}
+
+void
+ExecBuildlogSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/)
+{
+ ZEN_ASSERT(m_Parent.m_ChunkResolver);
+ ChunkResolver& Resolver = *m_Parent.m_ChunkResolver;
+
+ if (std::filesystem::exists(m_Parent.m_RecordingLogPath))
+ {
+ m_Parent.ThrowOptionError(fmt::format("recording log directory '{}' already exists!", m_Parent.m_RecordingLogPath));
+ }
+
+ ZEN_NOT_IMPLEMENTED("build log generation not implemented yet!");
+
+ std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp");
+
+ zen::compute::ComputeServiceSession ComputeSession(Resolver);
+ ComputeSession.StartRecording(Resolver, m_Parent.m_RecordingLogPath);
+
+ Stopwatch ExecTimer;
+ int ReturnValue = m_Parent.RunSession(ComputeSession);
+
+ ZEN_CONSOLE("complete - took {}", NiceTimeSpanMs(ExecTimer.GetElapsedTimeMs()));
+
+ if (!ReturnValue)
+ {
+ ZEN_CONSOLE("all work items completed successfully");
+ }
+ else
+ {
+ ZEN_CONSOLE("some work items failed (code {})", ReturnValue);
+ }
}
//////////////////////////////////////////////////////////////////////////
+// ExecCommand
-int
-ExecCommand::BuildActionsLog()
+ExecCommand::ExecCommand()
{
- ZEN_ASSERT(m_ChunkResolver);
- ChunkResolver& Resolver = *m_ChunkResolver;
+ m_Options.add_options()("h,help", "Print help");
+ m_Options.add_option("replay", "p", "path", "Recording path (directory or .actionlog file)", cxxopts::value(m_RecordingPath), "<path>");
+ m_Options.add_option("replay", "", "log", "Action log directory", cxxopts::value(m_RecordingLogPath), "<path>");
+ m_Options.add_option("replay", "", "offset", "Recording replay start offset", cxxopts::value(m_Offset), "<offset>");
+ m_Options.add_option("replay", "", "stride", "Recording replay stride", cxxopts::value(m_Stride), "<stride>");
+ m_Options.add_option("replay", "", "limit", "Recording replay limit", cxxopts::value(m_Limit), "<limit>");
+ m_Options.add_option("", "", "quiet", "Quiet mode (less logging)", cxxopts::value(m_Quiet), "<bool>");
+ m_Options.add_option("output",
+ "",
+ "dump-actions",
+ "Dump each action to console as it is dispatched",
+ cxxopts::value(m_DumpActions),
+ "<bool>");
+ m_Options.add_option("output", "o", "output", "Save action results to directory", cxxopts::value(m_OutputPath), "<path>");
+ m_Options.add_option("output", "", "binary", "Write output as binary packages instead of YAML", cxxopts::value(m_Binary), "<bool>");
+ m_Options.add_option("__hidden__", "", "subcommand", "", cxxopts::value<std::string>(m_SubCommand)->default_value(""), "");
+ m_Options.parse_positional({"subcommand"});
+
+ AddSubCommand(m_HttpSubCmd);
+ AddSubCommand(m_InprocSubCmd);
+ AddSubCommand(m_BeaconSubCmd);
+ AddSubCommand(m_DumpSubCmd);
+ AddSubCommand(m_BuildlogSubCmd);
+}
+ExecCommand::~ExecCommand()
+{
+}
+
+bool
+ExecCommand::OnParentOptionsParsed(const ZenCliOptions& GlobalOptions)
+{
if (m_RecordingPath.empty())
{
- throw OptionParseException("need to specify recording path", m_Options.help());
+ ThrowOptionError("replay path is required!");
}
- if (std::filesystem::exists(m_RecordingLogPath))
+ m_VerboseLogging = GlobalOptions.IsVerbose;
+ m_QuietLogging = m_Quiet && !m_VerboseLogging;
+
+ // Gather information from recording path
+
+ std::filesystem::path RecordingPath{m_RecordingPath};
+
+ if (!std::filesystem::is_directory(RecordingPath))
{
- throw OptionParseException(fmt::format("recording log directory '{}' already exists!", m_RecordingLogPath), m_Options.help());
+ ThrowOptionError("replay path should be a directory path!");
+ }
+ else
+ {
+ if (std::filesystem::is_directory(RecordingPath / "cid"))
+ {
+ m_Reader = std::make_unique<zen::compute::RecordingReader>(RecordingPath);
+ m_WorkerMap = m_Reader->ReadWorkers();
+ m_ChunkResolver = m_Reader.get();
+ m_RecordingReader = m_Reader.get();
+ }
+ else
+ {
+ m_UeReader = std::make_unique<zen::compute::UeRecordingReader>(RecordingPath);
+ m_WorkerMap = m_UeReader->ReadWorkers();
+ m_ChunkResolver = m_UeReader.get();
+ m_RecordingReader = m_UeReader.get();
+ }
}
- ZEN_NOT_IMPLEMENTED("build log generation not implemented yet!");
+ ZEN_CONSOLE("found {} workers, {} action items", m_WorkerMap.size(), m_RecordingReader->GetActionCount());
- std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp");
+ for (auto& Kv : m_WorkerMap)
+ {
+ CbObject WorkerDesc = Kv.second.GetObject();
+ const IoHash& WorkerId = Kv.first;
- zen::compute::ComputeServiceSession ComputeSession(Resolver);
- ComputeSession.StartRecording(Resolver, m_RecordingLogPath);
+ RegisterWorkerFunctionsFromDescription(WorkerDesc, WorkerId);
+
+ if (m_VerboseLogging)
+ {
+ zen::ExtendableStringBuilder<1024> ObjStr;
+# if 0
+ zen::CompactBinaryToJson(WorkerDesc, ObjStr);
+ ZEN_CONSOLE("worker {}: {}", WorkerId, ObjStr);
+# else
+ zen::CompactBinaryToYaml(WorkerDesc, ObjStr);
+ ZEN_CONSOLE("worker {}:\n{}", WorkerId, ObjStr);
+# endif
+ }
+ }
+
+ if (m_VerboseLogging)
+ {
+ EmitFunctionList(m_FunctionList);
+ }
+
+ return true;
+}
- return ExecUsingSession(ComputeSession);
+int
+ExecCommand::RunSession(zen::compute::ComputeServiceSession& ComputeSession, std::string_view OrchestratorUrl)
+{
+ ExecSessionConfig Config{
+ .Resolver = *m_ChunkResolver,
+ .RecordingReader = *m_RecordingReader,
+ .WorkerMap = m_WorkerMap,
+ .FunctionList = m_FunctionList,
+ .OrchestratorUrl = OrchestratorUrl,
+ .OutputPath = m_OutputPath,
+ .Offset = m_Offset,
+ .Stride = m_Stride,
+ .Limit = m_Limit,
+ .Verbose = m_VerboseLogging,
+ .Quiet = m_QuietLogging,
+ .DumpActions = m_DumpActions,
+ .Binary = m_Binary,
+ };
+
+ ExecSessionRunner Runner(ComputeSession, Config);
+ return Runner.Run();
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+void
+ExecCommand::RegisterWorkerFunctionsFromDescription(const CbObject& WorkerDesc, const IoHash& WorkerId)
+{
+ const Guid WorkerBuildSystemVersion = WorkerDesc["buildsystem_version"sv].AsUuid();
+
+ for (auto& Item : WorkerDesc["functions"sv])
+ {
+ CbObjectView Function = Item.AsObjectView();
+
+ std::string_view FunctionName = Function["name"sv].AsString();
+ const Guid FunctionVersion = Function["version"sv].AsUuid();
+
+ m_FunctionList.emplace_back(FunctionDefinition{.FunctionName = std::string{FunctionName},
+ .FunctionVersion = FunctionVersion,
+ .BuildSystemVersion = WorkerBuildSystemVersion,
+ .WorkerId = WorkerId});
+ }
}
void