From eba410c4168e23d7908827eb34b7cf0c58a5dc48 Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Wed, 18 Mar 2026 11:19:10 +0100 Subject: Compute batching (#849) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### Compute Batch Submission - Consolidate duplicated action submission logic in `httpcomputeservice` into a single `HandleSubmitAction` supporting both single-action and batch (actions array) payloads - Group actions by queue in `RemoteHttpRunner` and submit as batches with configurable chunk size, falling back to individual submission on failure - Extract shared helpers: `MakeErrorResult`, `ValidateQueueForEnqueue`, `ActivateActionInQueue`, `RemoveActionFromActiveMaps` ### Retracted Action State - Add `Retracted` state to `RunnerAction` for retry-free rescheduling — an explicit request to pull an action back and reschedule it on a different runner without incrementing `RetryCount` - Implement idempotent `RetractAction()` on `RunnerAction` and `ComputeServiceSession` - Add `POST jobs/{lsn}/retract` and `queues/{queueref}/jobs/{lsn}/retract` HTTP endpoints - Add state machine documentation and per-state comments to `RunnerAction` ### Compute Race Fixes - Fix race in `HandleActionUpdates` where actions enqueued between session abandon and scheduler tick were never abandoned, causing `GetActionResult` to return 202 indefinitely - Fix queue `ActiveCount` race where `NotifyQueueActionComplete` was called after releasing `m_ResultsLock`, allowing callers to observe stale counters immediately after `GetActionResult` returned OK ### Logging Optimization and ANSI improvements - Improve `AnsiColorStdoutSink` write efficiency — single write call, dirty-flag flush, `RwLock` instead of `std::mutex` - Move ANSI color emission from sink into formatters via `Formatter::SetColorEnabled()`; remove `ColorRangeStart`/`End` from `LogMessage` - Extract color helpers (`AnsiColorForLevel`, `StripAnsiSgrSequences`) into `helpers.h` - Strip upstream ANSI SGR escapes in non-color output mode. This enables colour in log messages without polluting log files with ANSI control sequences - Move `RotatingFileSink`, `JsonFormatter`, and `FullFormatter` from header-only to pimpl with `.cpp` files ### CLI / Exec Refactoring - Extract `ExecSessionRunner` class from ~920-line `ExecUsingSession` into focused methods and a `ExecSessionConfig` struct - Replace monolithic `ExecCommand` with subcommand-based architecture (`http`, `inproc`, `beacon`, `dump`, `buildlog`) - Allow parent options to appear after subcommand name by parsing subcommand args permissively and forwarding unmatched tokens to the parent parser ### Testing Improvements - Fix `--test-suite` filter being ignored due to accumulation with default wildcard filter - Add test suite banners to test listener output - Made `function.session.abandon_pending` test more robust ### Startup / Reliability Fixes - Fix silent exit when a second zenserver instance detects a port conflict — use `ZEN_CONSOLE_*` for log calls that precede `InitializeLogging()` - Fix two potential SIGSEGV paths during early startup: guard `sentry_options_new()` returning nullptr, and throw on `ZenServerState::Register()` returning nullptr instead of dereferencing - Fail on unrecognized zenserver `--mode` instead of silently defaulting to store ### Other - Show host details (hostname, platform, CPU count, memory) when discovering new compute workers - Move frontend `html.zip` from source tree into build directory - Add format specifications for Compact Binary and Compressed Buffer wire formats - Add `WriteCompactBinaryObject` to zencore - Extended `ConsoleTui` with additional functionality - Add `--vscode` option to `xmake sln` for clangd / `compile_commands.json` support - Disable compute/horde/nomad in release builds (not yet production-ready) - Disable unintended `ASIO_HAS_IO_URING` enablement - Fix crashpad patch missing leading whitespace - Clean up code triggering gcc false positives --- src/zen/cmds/exec_cmd.cpp | 1866 ++++++++++++++++++++++++--------------------- 1 file changed, 983 insertions(+), 883 deletions(-) (limited to 'src/zen/cmds/exec_cmd.cpp') diff --git a/src/zen/cmds/exec_cmd.cpp b/src/zen/cmds/exec_cmd.cpp index cbc153e07..30e860a3f 100644 --- a/src/zen/cmds/exec_cmd.cpp +++ b/src/zen/cmds/exec_cmd.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -41,255 +42,122 @@ struct hash : public zen::IoHash::Hasher namespace zen { -ExecCommand::ExecCommand() -{ - m_Options.add_options()("h,help", "Print help"); - m_Options.add_option("", "u", "hosturl", kHostUrlHelp, cxxopts::value(m_HostName), ""); - m_Options.add_option("", "", "log", "Action log directory", cxxopts::value(m_RecordingLogPath), ""); - m_Options.add_option("", "p", "path", "Recording path (directory or .actionlog file)", cxxopts::value(m_RecordingPath), ""); - m_Options.add_option("", "", "offset", "Recording replay start offset", cxxopts::value(m_Offset), ""); - m_Options.add_option("", "", "stride", "Recording replay stride", cxxopts::value(m_Stride), ""); - m_Options.add_option("", "", "limit", "Recording replay limit", cxxopts::value(m_Limit), ""); - m_Options.add_option("", "", "beacon", "Beacon path", cxxopts::value(m_BeaconPath), ""); - m_Options.add_option("", "", "orch", "Orchestrator URL for worker discovery", cxxopts::value(m_OrchestratorUrl), ""); - m_Options.add_option("", - "", - "mode", - "Select execution mode (http,inproc,dump,direct,beacon,buildlog)", - cxxopts::value(m_Mode)->default_value("http"), - ""); - m_Options - .add_option("", "", "dump-actions", "Dump each action to console as it is dispatched", cxxopts::value(m_DumpActions), ""); - m_Options.add_option("", "o", "output", "Save action results to directory", cxxopts::value(m_OutputPath), ""); - m_Options.add_option("", "", "binary", "Write output as binary packages instead of YAML", cxxopts::value(m_Binary), ""); - m_Options.add_option("", "", "quiet", "Quiet mode (less logging)", cxxopts::value(m_Quiet), ""); - m_Options.parse_positional("mode"); -} - -ExecCommand::~ExecCommand() -{ -} - -void -ExecCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) -{ - // Configure - - if (!ParseOptions(argc, argv)) - { - return; - } - - m_HostName = ResolveTargetHostSpec(m_HostName); - - if (m_RecordingPath.empty()) - { - throw OptionParseException("replay path is required!", m_Options.help()); - } - - m_VerboseLogging = GlobalOptions.IsVerbose; - m_QuietLogging = m_Quiet && !m_VerboseLogging; +namespace { - enum ExecMode - { - kHttp, - kDirect, - kInproc, - kDump, - kBeacon, - kBuildLog - } Mode; - - if (m_Mode == "http"sv) - { - Mode = kHttp; - } - else if (m_Mode == "direct"sv) - { - Mode = kDirect; - } - else if (m_Mode == "inproc"sv) + static std::string EscapeHtml(std::string_view Input) { - Mode = kInproc; - } - else if (m_Mode == "dump"sv) - { - Mode = kDump; - } - else if (m_Mode == "beacon"sv) - { - Mode = kBeacon; - } - else if (m_Mode == "buildlog"sv) - { - Mode = kBuildLog; - } - else - { - throw OptionParseException("invalid mode specified!", m_Options.help()); - } - - // Gather information from recording path - - std::unique_ptr Reader; - std::unique_ptr UeReader; - - std::filesystem::path RecordingPath{m_RecordingPath}; - - if (!std::filesystem::is_directory(RecordingPath)) - { - throw OptionParseException("replay path should be a directory path!", m_Options.help()); - } - else - { - if (std::filesystem::is_directory(RecordingPath / "cid")) - { - Reader = std::make_unique(RecordingPath); - m_WorkerMap = Reader->ReadWorkers(); - m_ChunkResolver = Reader.get(); - m_RecordingReader = Reader.get(); - } - else + std::string Out; + Out.reserve(Input.size()); + for (char C : Input) { - UeReader = std::make_unique(RecordingPath); - m_WorkerMap = UeReader->ReadWorkers(); - m_ChunkResolver = UeReader.get(); - m_RecordingReader = UeReader.get(); + switch (C) + { + case '&': + Out += "&"; + break; + case '<': + Out += "<"; + break; + case '>': + Out += ">"; + break; + case '"': + Out += """; + break; + case '\'': + Out += "'"; + break; + default: + Out += C; + } } + return Out; } - ZEN_CONSOLE("found {} workers, {} action items", m_WorkerMap.size(), m_RecordingReader->GetActionCount()); - - for (auto& Kv : m_WorkerMap) + static std::string EscapeJson(std::string_view Input) { - CbObject WorkerDesc = Kv.second.GetObject(); - const IoHash& WorkerId = Kv.first; - - RegisterWorkerFunctionsFromDescription(WorkerDesc, WorkerId); - - if (m_VerboseLogging) + std::string Out; + Out.reserve(Input.size()); + for (char C : Input) { - zen::ExtendableStringBuilder<1024> ObjStr; -# if 0 - zen::CompactBinaryToJson(WorkerDesc, ObjStr); - ZEN_CONSOLE("worker {}: {}", WorkerId, ObjStr); -# else - zen::CompactBinaryToYaml(WorkerDesc, ObjStr); - ZEN_CONSOLE("worker {}:\n{}", WorkerId, ObjStr); -# endif + switch (C) + { + case '"': + Out += "\\\""; + break; + case '\\': + Out += "\\\\"; + break; + case '\n': + Out += "\\n"; + break; + case '\r': + Out += "\\r"; + break; + case '\t': + Out += "\\t"; + break; + default: + if (static_cast(C) < 0x20) + { + Out += fmt::format("\\u{:04x}", static_cast(static_cast(C))); + } + else + { + Out += C; + } + } } + return Out; } - if (m_VerboseLogging) - { - EmitFunctionList(m_FunctionList); - } - - // Iterate over work items and dispatch or log them - - int ReturnValue = 0; - - Stopwatch ExecTimer; - - switch (Mode) - { - case kHttp: - // Forward requests to HTTP function service - ReturnValue = HttpExecute(); - break; - - case kDirect: - // Not currently supported - ReturnValue = LocalMessagingExecute(); - break; - - case kInproc: - // Handle execution in-core (by spawning child processes) - ReturnValue = InProcessExecute(); - break; - - case kDump: - // Dump high level information about actions to console - ReturnValue = DumpWorkItems(); - break; - - case kBeacon: - ReturnValue = BeaconExecute(); - break; - - case kBuildLog: - ReturnValue = BuildActionsLog(); - break; - - default: - ZEN_ERROR("Unknown operating mode! No work submitted"); - - ReturnValue = 1; - } - - ZEN_CONSOLE("complete - took {}", NiceTimeSpanMs(ExecTimer.GetElapsedTimeMs())); +} // namespace - if (!ReturnValue) - { - ZEN_CONSOLE("all work items completed successfully"); - } - else - { - ZEN_CONSOLE("some work items failed (code {})", ReturnValue); - } -} +////////////////////////////////////////////////////////////////////////// +// ExecSessionConfig — read-only configuration for a session run -int -ExecCommand::InProcessExecute() +struct ExecSessionConfig { - ZEN_ASSERT(m_ChunkResolver); - ChunkResolver& Resolver = *m_ChunkResolver; + zen::ChunkResolver& Resolver; + zen::compute::RecordingReaderBase& RecordingReader; + const std::unordered_map& WorkerMap; + std::vector& FunctionList; // mutable for EmitFunctionListOnce + std::string_view OrchestratorUrl; + const std::filesystem::path& OutputPath; + int Offset = 0; + int Stride = 1; + int Limit = 0; + bool Verbose = false; + bool Quiet = false; + bool DumpActions = false; + bool Binary = false; +}; - zen::compute::ComputeServiceSession ComputeSession(Resolver); +////////////////////////////////////////////////////////////////////////// +// ExecSessionRunner — owns per-run state, drives the session lifecycle - std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp"); - ComputeSession.AddLocalRunner(Resolver, TempPath); +class ExecSessionRunner +{ +public: + ExecSessionRunner(zen::compute::ComputeServiceSession& Session, const ExecSessionConfig& Config); + int Run(); - return ExecUsingSession(ComputeSession); -} +private: + // Types -int -ExecCommand::ExecUsingSession(zen::compute::ComputeServiceSession& ComputeSession) -{ struct JobTracker { public: - inline void Insert(int LsnField) - { - RwLock::ExclusiveLockScope _(Lock); - PendingJobs.insert(LsnField); - } - - inline bool IsEmpty() const - { - RwLock::SharedLockScope _(Lock); - return PendingJobs.empty(); - } - - inline void Remove(int CompleteLsn) - { - RwLock::ExclusiveLockScope _(Lock); - PendingJobs.erase(CompleteLsn); - } - - inline size_t GetSize() const - { - RwLock::SharedLockScope _(Lock); - return PendingJobs.size(); - } + void Insert(int LsnField); + bool IsEmpty() const; + void Remove(int CompleteLsn); + size_t GetSize() const; private: mutable RwLock Lock; std::unordered_set PendingJobs; }; - JobTracker PendingJobs; - struct ActionSummaryEntry { int32_t Lsn = 0; @@ -307,664 +175,471 @@ ExecCommand::ExecUsingSession(zen::compute::ComputeServiceSession& ComputeSessio std::string ExecutionLocation; }; - std::mutex SummaryLock; - std::unordered_map SummaryEntries; + // Methods + + std::string RegisterOrchestratorClient(); + void SendOrchestratorHeartbeat(); + void CompleteOrchestratorClient(); + void DrainCompletedJobs(); + void SaveResultOutput(int32_t CompleteLsn, CbPackage& ResultPackage); + void SaveActionOutput(int32_t Lsn, int RecordingIndex, const IoHash& ActionId, const CbObject& ActionObject); + void WriteSummaryFiles(); + void EmitFunctionListOnce(); + + // State + + zen::compute::ComputeServiceSession& m_Session; + ExecSessionConfig m_Config; + JobTracker m_PendingJobs; + std::mutex m_SummaryLock; + std::unordered_map m_SummaryEntries; + int m_QueueId = 0; + std::string m_OrchestratorClientId; + Stopwatch m_OrchestratorHeartbeatTimer; + bool m_FunctionListEmittedOnce = false; + std::atomic m_IsDraining{0}; +}; - ComputeSession.WaitUntilReady(); +////////////////////////////////////////////////////////////////////////// +// ExecSessionRunner::JobTracker - // Register as a client with the orchestrator (best-effort) +void +ExecSessionRunner::JobTracker::Insert(int LsnField) +{ + RwLock::ExclusiveLockScope _(Lock); + PendingJobs.insert(LsnField); +} + +bool +ExecSessionRunner::JobTracker::IsEmpty() const +{ + RwLock::SharedLockScope _(Lock); + return PendingJobs.empty(); +} - std::string OrchestratorClientId; +void +ExecSessionRunner::JobTracker::Remove(int CompleteLsn) +{ + RwLock::ExclusiveLockScope _(Lock); + PendingJobs.erase(CompleteLsn); +} - if (!m_OrchestratorUrl.empty()) +size_t +ExecSessionRunner::JobTracker::GetSize() const +{ + RwLock::SharedLockScope _(Lock); + return PendingJobs.size(); +} + +////////////////////////////////////////////////////////////////////////// +// ExecSessionRunner implementation + +ExecSessionRunner::ExecSessionRunner(zen::compute::ComputeServiceSession& Session, const ExecSessionConfig& Config) +: m_Session(Session) +, m_Config(Config) +{ +} + +std::string +ExecSessionRunner::RegisterOrchestratorClient() +{ + if (m_Config.OrchestratorUrl.empty()) { - try - { - HttpClient OrchestratorClient(m_OrchestratorUrl); + return {}; + } - CbObjectWriter Ann; - Ann << "session_id"sv << GetSessionId(); - Ann << "hostname"sv << std::string_view(GetMachineName()); + try + { + HttpClient OrchestratorClient(m_Config.OrchestratorUrl); - CbObjectWriter Meta; - Meta << "source"sv - << "zen-exec"sv; - Ann << "metadata"sv << Meta.Save(); + CbObjectWriter Ann; + Ann << "session_id"sv << GetSessionId(); + Ann << "hostname"sv << std::string_view(GetMachineName()); - auto Resp = OrchestratorClient.Post("/orch/clients", Ann.Save()); - if (Resp.IsSuccess()) - { - OrchestratorClientId = std::string(Resp.AsObject()["id"].AsString()); - ZEN_CONSOLE_INFO("registered with orchestrator as {}", OrchestratorClientId); - } - else - { - ZEN_WARN("failed to register with orchestrator (status {})", static_cast(Resp.StatusCode)); - } + CbObjectWriter Meta; + Meta << "source"sv + << "zen-exec"sv; + Ann << "metadata"sv << Meta.Save(); + + auto Resp = OrchestratorClient.Post("/orch/clients", Ann.Save()); + if (Resp.IsSuccess()) + { + std::string ClientId = std::string(Resp.AsObject()["id"].AsString()); + ZEN_CONSOLE_INFO("registered with orchestrator as {}", ClientId); + return ClientId; } - catch (const std::exception& Ex) + else { - ZEN_WARN("failed to register with orchestrator: {}", Ex.what()); + ZEN_WARN("failed to register with orchestrator (status {})", static_cast(Resp.StatusCode)); } } + catch (const std::exception& Ex) + { + ZEN_WARN("failed to register with orchestrator: {}", Ex.what()); + } - Stopwatch OrchestratorHeartbeatTimer; + return {}; +} - auto SendOrchestratorHeartbeat = [&] { - if (OrchestratorClientId.empty() || OrchestratorHeartbeatTimer.GetElapsedTimeMs() < 30'000) - { - return; - } - OrchestratorHeartbeatTimer.Reset(); +void +ExecSessionRunner::SendOrchestratorHeartbeat() +{ + if (m_OrchestratorClientId.empty() || m_OrchestratorHeartbeatTimer.GetElapsedTimeMs() < 30'000) + { + return; + } + m_OrchestratorHeartbeatTimer.Reset(); + try + { + HttpClient OrchestratorClient(m_Config.OrchestratorUrl); + std::ignore = OrchestratorClient.Post(fmt::format("/orch/clients/{}/update", m_OrchestratorClientId)); + } + catch (...) + { + } +} + +void +ExecSessionRunner::CompleteOrchestratorClient() +{ + if (!m_OrchestratorClientId.empty()) + { try { - HttpClient OrchestratorClient(m_OrchestratorUrl); - std::ignore = OrchestratorClient.Post(fmt::format("/orch/clients/{}/update", OrchestratorClientId)); + HttpClient OrchestratorClient(m_Config.OrchestratorUrl); + std::ignore = OrchestratorClient.Post(fmt::format("/orch/clients/{}/complete", m_OrchestratorClientId)); } catch (...) { } - }; - - auto ClientCleanup = MakeGuard([&] { - if (!OrchestratorClientId.empty()) - { - try - { - HttpClient OrchestratorClient(m_OrchestratorUrl); - std::ignore = OrchestratorClient.Post(fmt::format("/orch/clients/{}/complete", OrchestratorClientId)); - } - catch (...) - { - } - } - }); - - // Create a queue to group all actions from this exec session - - CbObjectWriter Metadata; - Metadata << "source"sv - << "zen-exec"sv; - - auto QueueResult = ComputeSession.CreateQueue("zen-exec", Metadata.Save()); - const int QueueId = QueueResult.QueueId; - if (!QueueId) - { - ZEN_ERROR("failed to create compute queue"); - return 1; } +} - auto QueueCleanup = MakeGuard([&] { ComputeSession.DeleteQueue(QueueId); }); - - if (!m_OutputPath.empty()) +void +ExecSessionRunner::DrainCompletedJobs() +{ + if (m_IsDraining.exchange(1)) { - zen::CreateDirectories(m_OutputPath); + return; } - std::atomic IsDraining{0}; - - auto DrainCompletedJobs = [&] { - if (IsDraining.exchange(1)) - { - return; - } - - auto _ = MakeGuard([&] { IsDraining.store(0, std::memory_order_release); }); + auto _ = MakeGuard([&] { m_IsDraining.store(0, std::memory_order_release); }); - CbObjectWriter Cbo; - ComputeSession.GetQueueCompleted(QueueId, Cbo); + CbObjectWriter Cbo; + m_Session.GetQueueCompleted(m_QueueId, Cbo); - if (CbObject Completed = Cbo.Save()) + if (CbObject Completed = Cbo.Save()) + { + for (auto& It : Completed["completed"sv]) { - for (auto& It : Completed["completed"sv]) - { - int32_t CompleteLsn = It.AsInt32(); + int32_t CompleteLsn = It.AsInt32(); - CbPackage ResultPackage; - HttpResponseCode Response = ComputeSession.GetActionResult(CompleteLsn, /* out */ ResultPackage); + CbPackage ResultPackage; + HttpResponseCode Response = m_Session.GetActionResult(CompleteLsn, /* out */ ResultPackage); - if (Response == HttpResponseCode::OK) + if (Response == HttpResponseCode::OK) + { + if (!m_Config.OutputPath.empty() && ResultPackage) { - if (!m_OutputPath.empty() && ResultPackage) - { - int OutputAttachments = 0; - uint64_t OutputBytes = 0; - - if (!m_Binary) - { - // Write the root object as YAML - ExtendableStringBuilder<4096> YamlStr; - CompactBinaryToYaml(ResultPackage.GetObject(), YamlStr); - - std::string_view Yaml = YamlStr; - zen::WriteFile(m_OutputPath / fmt::format("{}.result.yaml", CompleteLsn), - IoBuffer(IoBuffer::Clone, Yaml.data(), Yaml.size())); - - // Write decompressed attachments - auto Attachments = ResultPackage.GetAttachments(); - - if (!Attachments.empty()) - { - std::filesystem::path AttDir = m_OutputPath / fmt::format("{}.result.attachments", CompleteLsn); - zen::CreateDirectories(AttDir); - - for (const CbAttachment& Att : Attachments) - { - ++OutputAttachments; - - IoHash AttHash = Att.GetHash(); - - if (Att.IsCompressedBinary()) - { - SharedBuffer Decompressed = Att.AsCompressedBinary().Decompress(); - OutputBytes += Decompressed.GetSize(); - zen::WriteFile(AttDir / AttHash.ToHexString(), - IoBuffer(IoBuffer::Clone, Decompressed.GetData(), Decompressed.GetSize())); - } - else - { - SharedBuffer Binary = Att.AsBinary(); - OutputBytes += Binary.GetSize(); - zen::WriteFile(AttDir / AttHash.ToHexString(), - IoBuffer(IoBuffer::Clone, Binary.GetData(), Binary.GetSize())); - } - } - } - - if (!m_QuietLogging) - { - ZEN_CONSOLE("saved result: {}/{}.result.yaml ({} attachments)", - m_OutputPath.string(), - CompleteLsn, - OutputAttachments); - } - } - else - { - CompositeBuffer Serialized = FormatPackageMessageBuffer(ResultPackage); - zen::WriteFile(m_OutputPath / fmt::format("{}.result.pkg", CompleteLsn), std::move(Serialized)); - - for (const CbAttachment& Att : ResultPackage.GetAttachments()) - { - ++OutputAttachments; - OutputBytes += Att.AsBinary().GetSize(); - } - - if (!m_QuietLogging) - { - ZEN_CONSOLE("saved result: {}/{}.result.pkg", m_OutputPath.string(), CompleteLsn); - } - } - - std::lock_guard Lock(SummaryLock); - if (auto It2 = SummaryEntries.find(CompleteLsn); It2 != SummaryEntries.end()) - { - It2->second.OutputAttachments = OutputAttachments; - It2->second.OutputBytes = OutputBytes; - } - } + SaveResultOutput(CompleteLsn, ResultPackage); + } - PendingJobs.Remove(CompleteLsn); + m_PendingJobs.Remove(CompleteLsn); - ZEN_CONSOLE("completed: LSN {} ({} still pending)", CompleteLsn, PendingJobs.GetSize()); - } + ZEN_CONSOLE("completed: LSN {} ({} still pending)", CompleteLsn, m_PendingJobs.GetSize()); } } - }; - - // Describe workers + } +} - ZEN_CONSOLE("describing {} workers", m_WorkerMap.size()); +void +ExecSessionRunner::SaveResultOutput(int32_t CompleteLsn, CbPackage& ResultPackage) +{ + int OutputAttachments = 0; + uint64_t OutputBytes = 0; - for (auto Kv : m_WorkerMap) + if (!m_Config.Binary) { - CbPackage WorkerDesc = Kv.second; + // Write the root object as YAML + ExtendableStringBuilder<4096> YamlStr; + CompactBinaryToYaml(ResultPackage.GetObject(), YamlStr); - ComputeSession.RegisterWorker(WorkerDesc); - } - - // Then submit work items + std::string_view Yaml = YamlStr; + zen::WriteFile(m_Config.OutputPath / fmt::format("{}.result.yaml", CompleteLsn), + IoBuffer(IoBuffer::Clone, Yaml.data(), Yaml.size())); - int FailedWorkCounter = 0; - size_t RemainingWorkItems = m_RecordingReader->GetActionCount(); - int SubmittedWorkItems = 0; + // Write decompressed attachments + auto Attachments = ResultPackage.GetAttachments(); - ZEN_CONSOLE("submitting {} work items", RemainingWorkItems); + if (!Attachments.empty()) + { + std::filesystem::path AttDir = m_Config.OutputPath / fmt::format("{}.result.attachments", CompleteLsn); + zen::CreateDirectories(AttDir); - int OffsetCounter = m_Offset; - int StrideCounter = m_Stride; + for (const CbAttachment& Att : Attachments) + { + ++OutputAttachments; - auto ShouldSchedule = [&]() -> bool { - if (m_Limit && SubmittedWorkItems >= m_Limit) - { - // Limit reached, ignore + IoHash AttHash = Att.GetHash(); - return false; + if (Att.IsCompressedBinary()) + { + SharedBuffer Decompressed = Att.AsCompressedBinary().Decompress(); + OutputBytes += Decompressed.GetSize(); + zen::WriteFile(AttDir / AttHash.ToHexString(), + IoBuffer(IoBuffer::Clone, Decompressed.GetData(), Decompressed.GetSize())); + } + else + { + SharedBuffer Binary = Att.AsBinary(); + OutputBytes += Binary.GetSize(); + zen::WriteFile(AttDir / AttHash.ToHexString(), IoBuffer(IoBuffer::Clone, Binary.GetData(), Binary.GetSize())); + } + } } - if (OffsetCounter && OffsetCounter--) + if (!m_Config.Quiet) { - // Still in offset, ignore - - return false; + ZEN_CONSOLE("saved result: {}/{}.result.yaml ({} attachments)", m_Config.OutputPath.string(), CompleteLsn, OutputAttachments); } + } + else + { + CompositeBuffer Serialized = FormatPackageMessageBuffer(ResultPackage); + zen::WriteFile(m_Config.OutputPath / fmt::format("{}.result.pkg", CompleteLsn), std::move(Serialized)); - if (--StrideCounter == 0) + for (const CbAttachment& Att : ResultPackage.GetAttachments()) { - StrideCounter = m_Stride; - - return true; + ++OutputAttachments; + OutputBytes += Att.AsBinary().GetSize(); } - return false; - }; - - int TargetParallelism = 8; + if (!m_Config.Quiet) + { + ZEN_CONSOLE("saved result: {}/{}.result.pkg", m_Config.OutputPath.string(), CompleteLsn); + } + } - if (OffsetCounter || StrideCounter || m_Limit) + std::lock_guard Lock(m_SummaryLock); + if (auto It2 = m_SummaryEntries.find(CompleteLsn); It2 != m_SummaryEntries.end()) { - TargetParallelism = 1; + It2->second.OutputAttachments = OutputAttachments; + It2->second.OutputBytes = OutputBytes; } +} - std::atomic RecordingIndex{0}; - - m_RecordingReader->IterateActions( - [&](CbObject ActionObject, const IoHash& ActionId) { - // Enqueue job - - const int CurrentRecordingIndex = RecordingIndex++; - - Stopwatch SubmitTimer; - - const int Priority = 0; - - if (ShouldSchedule()) - { - if (m_VerboseLogging) - { - int AttachmentCount = 0; - uint64_t AttachmentBytes = 0; - eastl::hash_set ReferencedChunks; - - ActionObject.IterateAttachments([&](CbFieldView Field) { - IoHash AttachData = Field.AsAttachment(); - - ReferencedChunks.insert(AttachData); - ++AttachmentCount; - - if (IoBuffer ChunkData = m_ChunkResolver->FindChunkByCid(AttachData)) - { - AttachmentBytes += ChunkData.GetSize(); - } - }); - - zen::ExtendableStringBuilder<1024> ObjStr; - zen::CompactBinaryToJson(ActionObject, ObjStr); - ZEN_CONSOLE("work item {} ({} attachments, {} bytes): {}", - ActionId, - AttachmentCount, - NiceBytes(AttachmentBytes), - ObjStr); - } - - if (m_DumpActions) - { - int AttachmentCount = 0; - uint64_t AttachmentBytes = 0; - - ActionObject.IterateAttachments([&](CbFieldView Field) { - IoHash AttachData = Field.AsAttachment(); - - ++AttachmentCount; +void +ExecSessionRunner::SaveActionOutput(int32_t Lsn, int RecordingIndex, const IoHash& ActionId, const CbObject& ActionObject) +{ + ActionSummaryEntry Entry; + Entry.Lsn = Lsn; + Entry.RecordingIndex = RecordingIndex; + Entry.ActionId = ActionId; + Entry.FunctionName = std::string(ActionObject["Function"sv].AsString()); - if (IoBuffer ChunkData = m_ChunkResolver->FindChunkByCid(AttachData)) - { - AttachmentBytes += ChunkData.GetSize(); - } - }); + if (!m_Config.Binary) + { + // Write action object as YAML + ExtendableStringBuilder<4096> YamlStr; + CompactBinaryToYaml(ActionObject, YamlStr); - zen::ExtendableStringBuilder<1024> ObjStr; - zen::CompactBinaryToYaml(ActionObject, ObjStr); - ZEN_CONSOLE("action {} ({} attachments, {}):\n{}", ActionId, AttachmentCount, NiceBytes(AttachmentBytes), ObjStr); - } + std::string_view Yaml = YamlStr; + zen::WriteFile(m_Config.OutputPath / fmt::format("{}.action.yaml", Lsn), IoBuffer(IoBuffer::Clone, Yaml.data(), Yaml.size())); - if (zen::compute::ComputeServiceSession::EnqueueResult EnqueueResult = - ComputeSession.EnqueueActionToQueue(QueueId, ActionObject, Priority)) - { - const int32_t LsnField = EnqueueResult.Lsn; + // Write decompressed input attachments + std::filesystem::path AttDir = m_Config.OutputPath / fmt::format("{}.action.attachments", Lsn); + bool AttDirCreated = false; - --RemainingWorkItems; - ++SubmittedWorkItems; + ActionObject.IterateAttachments([&](CbFieldView Field) { + IoHash AttachCid = Field.AsAttachment(); + ++Entry.InputAttachments; - if (!m_QuietLogging) - { - ZEN_CONSOLE("submitted work item #{} - LSN {} - {}. {} remaining", - SubmittedWorkItems, - LsnField, - NiceTimeSpanMs(SubmitTimer.GetElapsedTimeMs()), - RemainingWorkItems); - } + if (IoBuffer ChunkData = m_Config.Resolver.FindChunkByCid(AttachCid)) + { + IoHash RawHash; + uint64_t RawSize = 0; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(ChunkData), RawHash, RawSize); + SharedBuffer Decompressed = Compressed.Decompress(); - if (!m_OutputPath.empty()) - { - ActionSummaryEntry Entry; - Entry.Lsn = LsnField; - Entry.RecordingIndex = CurrentRecordingIndex; - Entry.ActionId = ActionId; - Entry.FunctionName = std::string(ActionObject["Function"sv].AsString()); + Entry.InputBytes += Decompressed.GetSize(); - if (!m_Binary) - { - // Write action object as YAML - ExtendableStringBuilder<4096> YamlStr; - CompactBinaryToYaml(ActionObject, YamlStr); - - std::string_view Yaml = YamlStr; - zen::WriteFile(m_OutputPath / fmt::format("{}.action.yaml", LsnField), - IoBuffer(IoBuffer::Clone, Yaml.data(), Yaml.size())); - - // Write decompressed input attachments - std::filesystem::path AttDir = m_OutputPath / fmt::format("{}.action.attachments", LsnField); - bool AttDirCreated = false; - - ActionObject.IterateAttachments([&](CbFieldView Field) { - IoHash AttachCid = Field.AsAttachment(); - ++Entry.InputAttachments; - - if (IoBuffer ChunkData = m_ChunkResolver->FindChunkByCid(AttachCid)) - { - IoHash RawHash; - uint64_t RawSize = 0; - CompressedBuffer Compressed = - CompressedBuffer::FromCompressed(SharedBuffer(ChunkData), RawHash, RawSize); - SharedBuffer Decompressed = Compressed.Decompress(); - - Entry.InputBytes += Decompressed.GetSize(); - - if (!AttDirCreated) - { - zen::CreateDirectories(AttDir); - AttDirCreated = true; - } - - zen::WriteFile(AttDir / AttachCid.ToHexString(), - IoBuffer(IoBuffer::Clone, Decompressed.GetData(), Decompressed.GetSize())); - } - }); - - if (!m_QuietLogging) - { - ZEN_CONSOLE("saved action: {}/{}.action.yaml ({} attachments)", - m_OutputPath.string(), - LsnField, - Entry.InputAttachments); - } - } - else - { - // Build a CbPackage from the action and write as .pkg - CbPackage ActionPackage; - ActionPackage.SetObject(ActionObject); - - ActionObject.IterateAttachments([&](CbFieldView Field) { - IoHash AttachCid = Field.AsAttachment(); - ++Entry.InputAttachments; - - if (IoBuffer ChunkData = m_ChunkResolver->FindChunkByCid(AttachCid)) - { - IoHash RawHash; - uint64_t RawSize = 0; - CompressedBuffer Compressed = - CompressedBuffer::FromCompressed(SharedBuffer(ChunkData), RawHash, RawSize); - - Entry.InputBytes += ChunkData.GetSize(); - ActionPackage.AddAttachment(CbAttachment(std::move(Compressed), RawHash)); - } - }); - - CompositeBuffer Serialized = FormatPackageMessageBuffer(ActionPackage); - zen::WriteFile(m_OutputPath / fmt::format("{}.action.pkg", LsnField), std::move(Serialized)); - - if (!m_QuietLogging) - { - ZEN_CONSOLE("saved action: {}/{}.action.pkg", m_OutputPath.string(), LsnField); - } - } + if (!AttDirCreated) + { + zen::CreateDirectories(AttDir); + AttDirCreated = true; + } - std::lock_guard Lock(SummaryLock); - SummaryEntries.emplace(LsnField, std::move(Entry)); - } + zen::WriteFile(AttDir / AttachCid.ToHexString(), IoBuffer(IoBuffer::Clone, Decompressed.GetData(), Decompressed.GetSize())); + } + }); - PendingJobs.Insert(LsnField); - } - else - { - if (!m_QuietLogging) - { - std::string_view FunctionName = ActionObject["Function"sv].AsString(); - const Guid FunctionVersion = ActionObject["FunctionVersion"sv].AsUuid(); - const Guid BuildSystemVersion = ActionObject["BuildSystemVersion"sv].AsUuid(); + if (!m_Config.Quiet) + { + ZEN_CONSOLE("saved action: {}/{}.action.yaml ({} attachments)", m_Config.OutputPath.string(), Lsn, Entry.InputAttachments); + } + } + else + { + // Build a CbPackage from the action and write as .pkg + CbPackage ActionPackage; + ActionPackage.SetObject(ActionObject); - ZEN_ERROR( - "failed to resolve function for work with (Function:{},FunctionVersion:{},BuildSystemVersion:{}). Work " - "descriptor " - "at: 'file://{}'", - std::string(FunctionName), - FunctionVersion, - BuildSystemVersion, - ""); + ActionObject.IterateAttachments([&](CbFieldView Field) { + IoHash AttachCid = Field.AsAttachment(); + ++Entry.InputAttachments; - EmitFunctionListOnce(m_FunctionList); - } + if (IoBuffer ChunkData = m_Config.Resolver.FindChunkByCid(AttachCid)) + { + IoHash RawHash; + uint64_t RawSize = 0; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(ChunkData), RawHash, RawSize); - ++FailedWorkCounter; - } + Entry.InputBytes += ChunkData.GetSize(); + ActionPackage.AddAttachment(CbAttachment(std::move(Compressed), RawHash)); } + }); - // Check for completed work + CompositeBuffer Serialized = FormatPackageMessageBuffer(ActionPackage); + zen::WriteFile(m_Config.OutputPath / fmt::format("{}.action.pkg", Lsn), std::move(Serialized)); - DrainCompletedJobs(); - SendOrchestratorHeartbeat(); - }, - TargetParallelism); + if (!m_Config.Quiet) + { + ZEN_CONSOLE("saved action: {}/{}.action.pkg", m_Config.OutputPath.string(), Lsn); + } + } - // Wait until all pending work is complete + std::lock_guard Lock(m_SummaryLock); + m_SummaryEntries.emplace(Lsn, std::move(Entry)); +} - while (!PendingJobs.IsEmpty()) +void +ExecSessionRunner::WriteSummaryFiles() +{ + if (m_Config.OutputPath.empty() || m_SummaryEntries.empty()) { - // TODO: improve this logic - zen::Sleep(500); - - DrainCompletedJobs(); - SendOrchestratorHeartbeat(); + return; } // Merge timing data from queue history into summary entries - if (!SummaryEntries.empty()) + // RunnerAction::State indices (can't include functionrunner.h from here) + constexpr int kStateNew = 0; + constexpr int kStatePending = 1; + constexpr int kStateRunning = 3; + constexpr int kStateCompleted = 4; // first terminal state + constexpr int kStateCount = 8; + + for (const auto& HistEntry : m_Session.GetQueueHistory(m_QueueId, 0)) { - // RunnerAction::State indices (can't include functionrunner.h from here) - constexpr int kStateNew = 0; - constexpr int kStatePending = 1; - constexpr int kStateRunning = 3; - constexpr int kStateCompleted = 4; // first terminal state - constexpr int kStateCount = 8; - - for (const auto& HistEntry : ComputeSession.GetQueueHistory(QueueId, 0)) + std::lock_guard Lock(m_SummaryLock); + if (auto It = m_SummaryEntries.find(HistEntry.Lsn); It != m_SummaryEntries.end()) { - std::lock_guard Lock(SummaryLock); - if (auto It = SummaryEntries.find(HistEntry.Lsn); It != SummaryEntries.end()) + // Find terminal state timestamp (Completed, Failed, Abandoned, or Cancelled) + uint64_t EndTick = 0; + for (int S = kStateCompleted; S < kStateCount; ++S) { - // Find terminal state timestamp (Completed, Failed, Abandoned, or Cancelled) - uint64_t EndTick = 0; - for (int S = kStateCompleted; S < kStateCount; ++S) - { - if (HistEntry.Timestamps[S] != 0) - { - EndTick = HistEntry.Timestamps[S]; - break; - } - } - uint64_t StartTick = HistEntry.Timestamps[kStateNew]; - if (EndTick > StartTick) + if (HistEntry.Timestamps[S] != 0) { - It->second.WallSeconds = float(double(EndTick - StartTick) / double(TimeSpan::TicksPerSecond)); + EndTick = HistEntry.Timestamps[S]; + break; } - It->second.CpuSeconds = HistEntry.CpuSeconds; - It->second.SubmittedTicks = HistEntry.Timestamps[kStatePending]; - It->second.StartedTicks = HistEntry.Timestamps[kStateRunning]; - It->second.ExecutionLocation = HistEntry.ExecutionLocation; } + uint64_t StartTick = HistEntry.Timestamps[kStateNew]; + if (EndTick > StartTick) + { + It->second.WallSeconds = float(double(EndTick - StartTick) / double(TimeSpan::TicksPerSecond)); + } + It->second.CpuSeconds = HistEntry.CpuSeconds; + It->second.SubmittedTicks = HistEntry.Timestamps[kStatePending]; + It->second.StartedTicks = HistEntry.Timestamps[kStateRunning]; + It->second.ExecutionLocation = HistEntry.ExecutionLocation; } } - // Write summary file if output path is set + // Sort entries by recording index - if (!m_OutputPath.empty() && !SummaryEntries.empty()) + std::vector Sorted; + Sorted.reserve(m_SummaryEntries.size()); + for (auto& [_, Entry] : m_SummaryEntries) { - std::vector Sorted; - Sorted.reserve(SummaryEntries.size()); - for (auto& [_, Entry] : SummaryEntries) - { - Sorted.push_back(std::move(Entry)); - } - - std::sort(Sorted.begin(), Sorted.end(), [](const ActionSummaryEntry& A, const ActionSummaryEntry& B) { - return A.RecordingIndex < B.RecordingIndex; - }); + Sorted.push_back(std::move(Entry)); + } - auto FormatTimestamp = [](uint64_t Ticks) -> std::string { - if (Ticks == 0) - { - return "-"; - } - return DateTime(Ticks).ToString("%H:%M:%S.%s"); - }; - - ExtendableStringBuilder<4096> Summary; - Summary.Append(fmt::format("{:<8} {:<8} {:<40} {:<40} {:>8} {:>12} {:>8} {:>12} {:>8} {:>8} {:>12} {:>12} {:<24}\n", - "LSN", - "Index", - "ActionId", - "Function", - "InAtt", - "InBytes", - "OutAtt", - "OutBytes", - "Wall(s)", - "CPU(s)", - "Submitted", - "Started", - "Location")); - Summary.Append(fmt::format("{:-<8} {:-<8} {:-<40} {:-<40} {:-<8} {:-<12} {:-<8} {:-<12} {:-<8} {:-<8} {:-<12} {:-<12} {:-<24}\n", - "", - "", - "", - "", - "", - "", - "", - "", - "", - "", - "", - "", - "")); + std::sort(Sorted.begin(), Sorted.end(), [](const ActionSummaryEntry& A, const ActionSummaryEntry& B) { + return A.RecordingIndex < B.RecordingIndex; + }); - for (const ActionSummaryEntry& Entry : Sorted) + auto FormatTimestamp = [](uint64_t Ticks) -> std::string { + if (Ticks == 0) { - Summary.Append(fmt::format("{:<8} {:<8} {:<40} {:<40} {:>8} {:>12} {:>8} {:>12} {:>8.2f} {:>8.2f} {:>12} {:>12} {:<24}\n", - Entry.Lsn, - Entry.RecordingIndex, - Entry.ActionId, - Entry.FunctionName, - Entry.InputAttachments, - NiceBytes(Entry.InputBytes), - Entry.OutputAttachments, - NiceBytes(Entry.OutputBytes), - Entry.WallSeconds, - Entry.CpuSeconds, - FormatTimestamp(Entry.SubmittedTicks), - FormatTimestamp(Entry.StartedTicks), - Entry.ExecutionLocation)); + return "-"; } + return DateTime(Ticks).ToString("%H:%M:%S.%s"); + }; - std::filesystem::path SummaryPath = m_OutputPath / "summary.txt"; - std::string_view SummaryStr = Summary; - zen::WriteFile(SummaryPath, IoBuffer(IoBuffer::Clone, SummaryStr.data(), SummaryStr.size())); + // Write summary.txt + + ExtendableStringBuilder<4096> Summary; + Summary.Append(fmt::format("{:<8} {:<8} {:<40} {:<40} {:>8} {:>12} {:>8} {:>12} {:>8} {:>8} {:>12} {:>12} {:<24}\n", + "LSN", + "Index", + "ActionId", + "Function", + "InAtt", + "InBytes", + "OutAtt", + "OutBytes", + "Wall(s)", + "CPU(s)", + "Submitted", + "Started", + "Location")); + Summary.Append(fmt::format("{:-<8} {:-<8} {:-<40} {:-<40} {:-<8} {:-<12} {:-<8} {:-<12} {:-<8} {:-<8} {:-<12} {:-<12} {:-<24}\n", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "")); + + for (const ActionSummaryEntry& Entry : Sorted) + { + Summary.Append(fmt::format("{:<8} {:<8} {:<40} {:<40} {:>8} {:>12} {:>8} {:>12} {:>8.2f} {:>8.2f} {:>12} {:>12} {:<24}\n", + Entry.Lsn, + Entry.RecordingIndex, + Entry.ActionId, + Entry.FunctionName, + Entry.InputAttachments, + NiceBytes(Entry.InputBytes), + Entry.OutputAttachments, + NiceBytes(Entry.OutputBytes), + Entry.WallSeconds, + Entry.CpuSeconds, + FormatTimestamp(Entry.SubmittedTicks), + FormatTimestamp(Entry.StartedTicks), + Entry.ExecutionLocation)); + } - ZEN_CONSOLE("wrote summary to {}", SummaryPath.string()); + std::filesystem::path SummaryPath = m_Config.OutputPath / "summary.txt"; + std::string_view SummaryStr = Summary; + zen::WriteFile(SummaryPath, IoBuffer(IoBuffer::Clone, SummaryStr.data(), SummaryStr.size())); - if (!m_Binary) - { - auto EscapeHtml = [](std::string_view Input) -> std::string { - std::string Out; - Out.reserve(Input.size()); - for (char C : Input) - { - switch (C) - { - case '&': - Out += "&"; - break; - case '<': - Out += "<"; - break; - case '>': - Out += ">"; - break; - case '"': - Out += """; - break; - case '\'': - Out += "'"; - break; - default: - Out += C; - } - } - return Out; - }; + ZEN_CONSOLE("wrote summary to {}", SummaryPath.string()); - auto EscapeJson = [](std::string_view Input) -> std::string { - std::string Out; - Out.reserve(Input.size()); - for (char C : Input) - { - switch (C) - { - case '"': - Out += "\\\""; - break; - case '\\': - Out += "\\\\"; - break; - case '\n': - Out += "\\n"; - break; - case '\r': - Out += "\\r"; - break; - case '\t': - Out += "\\t"; - break; - default: - if (static_cast(C) < 0x20) - { - Out += fmt::format("\\u{:04x}", static_cast(static_cast(C))); - } - else - { - Out += C; - } - } - } - return Out; - }; + // Write summary.html - ExtendableStringBuilder<8192> Html; + if (!m_Config.Binary) + { + ExtendableStringBuilder<8192> Html; - Html.Append(std::string_view(R"( + Html.Append(std::string_view(R"( Exec Summary