aboutsummaryrefslogtreecommitdiff
path: root/src/zen/cmds/exec_cmd.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zen/cmds/exec_cmd.cpp')
-rw-r--r--src/zen/cmds/exec_cmd.cpp1374
1 files changed, 1374 insertions, 0 deletions
diff --git a/src/zen/cmds/exec_cmd.cpp b/src/zen/cmds/exec_cmd.cpp
new file mode 100644
index 000000000..42c7119e7
--- /dev/null
+++ b/src/zen/cmds/exec_cmd.cpp
@@ -0,0 +1,1374 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "exec_cmd.h"
+
+#include <zencompute/computeservice.h>
+#include <zencompute/recordingreader.h>
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinaryfile.h>
+#include <zencore/compactbinarypackage.h>
+#include <zencore/compactbinaryvalue.h>
+#include <zencore/compress.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
+#include <zencore/session.h>
+#include <zencore/stream.h>
+#include <zencore/string.h>
+#include <zencore/system.h>
+#include <zencore/timer.h>
+#include <zenhttp/httpclient.h>
+#include <zenhttp/packageformat.h>
+
+#include <EASTL/hash_map.h>
+#include <EASTL/hash_set.h>
+#include <EASTL/map.h>
+
+using namespace std::literals;
+
+namespace eastl {
+
+template<>
+struct hash<zen::IoHash> : public zen::IoHash::Hasher
+{
+};
+
+} // namespace eastl
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+namespace zen {
+
+ExecCommand::ExecCommand()
+{
+ m_Options.add_options()("h,help", "Print help");
+ m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName), "<hosturl>");
+ m_Options.add_option("", "", "log", "Action log directory", cxxopts::value(m_RecordingLogPath), "<path>");
+ m_Options.add_option("", "p", "path", "Recording path (directory or .actionlog file)", cxxopts::value(m_RecordingPath), "<path>");
+ m_Options.add_option("", "", "offset", "Recording replay start offset", cxxopts::value(m_Offset), "<offset>");
+ m_Options.add_option("", "", "stride", "Recording replay stride", cxxopts::value(m_Stride), "<stride>");
+ m_Options.add_option("", "", "limit", "Recording replay limit", cxxopts::value(m_Limit), "<limit>");
+ m_Options.add_option("", "", "beacon", "Beacon path", cxxopts::value(m_BeaconPath), "<path>");
+ m_Options.add_option("", "", "orch", "Orchestrator URL for worker discovery", cxxopts::value(m_OrchestratorUrl), "<url>");
+ m_Options.add_option("",
+ "",
+ "mode",
+ "Select execution mode (http,inproc,dump,direct,beacon,buildlog)",
+ cxxopts::value(m_Mode)->default_value("http"),
+ "<string>");
+ m_Options
+ .add_option("", "", "dump-actions", "Dump each action to console as it is dispatched", cxxopts::value(m_DumpActions), "<bool>");
+ m_Options.add_option("", "o", "output", "Save action results to directory", cxxopts::value(m_OutputPath), "<path>");
+ m_Options.add_option("", "", "binary", "Write output as binary packages instead of YAML", cxxopts::value(m_Binary), "<bool>");
+ m_Options.add_option("", "", "quiet", "Quiet mode (less logging)", cxxopts::value(m_Quiet), "<bool>");
+ m_Options.parse_positional("mode");
+}
+
+ExecCommand::~ExecCommand()
+{
+}
+
+void
+ExecCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
+{
+ // Configure
+
+ if (!ParseOptions(argc, argv))
+ {
+ return;
+ }
+
+ m_HostName = ResolveTargetHostSpec(m_HostName);
+
+ if (m_RecordingPath.empty())
+ {
+ throw OptionParseException("replay path is required!", m_Options.help());
+ }
+
+ m_VerboseLogging = GlobalOptions.IsVerbose;
+ m_QuietLogging = m_Quiet && !m_VerboseLogging;
+
+ enum ExecMode
+ {
+ kHttp,
+ kDirect,
+ kInproc,
+ kDump,
+ kBeacon,
+ kBuildLog
+ } Mode;
+
+ if (m_Mode == "http"sv)
+ {
+ Mode = kHttp;
+ }
+ else if (m_Mode == "direct"sv)
+ {
+ Mode = kDirect;
+ }
+ else if (m_Mode == "inproc"sv)
+ {
+ Mode = kInproc;
+ }
+ else if (m_Mode == "dump"sv)
+ {
+ Mode = kDump;
+ }
+ else if (m_Mode == "beacon"sv)
+ {
+ Mode = kBeacon;
+ }
+ else if (m_Mode == "buildlog"sv)
+ {
+ Mode = kBuildLog;
+ }
+ else
+ {
+ throw OptionParseException("invalid mode specified!", m_Options.help());
+ }
+
+ // Gather information from recording path
+
+ std::unique_ptr<zen::compute::RecordingReader> Reader;
+ std::unique_ptr<zen::compute::UeRecordingReader> UeReader;
+
+ std::filesystem::path RecordingPath{m_RecordingPath};
+
+ if (!std::filesystem::is_directory(RecordingPath))
+ {
+ throw OptionParseException("replay path should be a directory path!", m_Options.help());
+ }
+ else
+ {
+ if (std::filesystem::is_directory(RecordingPath / "cid"))
+ {
+ Reader = std::make_unique<zen::compute::RecordingReader>(RecordingPath);
+ m_WorkerMap = Reader->ReadWorkers();
+ m_ChunkResolver = Reader.get();
+ m_RecordingReader = Reader.get();
+ }
+ else
+ {
+ UeReader = std::make_unique<zen::compute::UeRecordingReader>(RecordingPath);
+ m_WorkerMap = UeReader->ReadWorkers();
+ m_ChunkResolver = UeReader.get();
+ m_RecordingReader = UeReader.get();
+ }
+ }
+
+ ZEN_CONSOLE("found {} workers, {} action items", m_WorkerMap.size(), m_RecordingReader->GetActionCount());
+
+ for (auto& Kv : m_WorkerMap)
+ {
+ CbObject WorkerDesc = Kv.second.GetObject();
+ const IoHash& WorkerId = Kv.first;
+
+ RegisterWorkerFunctionsFromDescription(WorkerDesc, WorkerId);
+
+ if (m_VerboseLogging)
+ {
+ zen::ExtendableStringBuilder<1024> ObjStr;
+# if 0
+ zen::CompactBinaryToJson(WorkerDesc, ObjStr);
+ ZEN_CONSOLE("worker {}: {}", WorkerId, ObjStr);
+# else
+ zen::CompactBinaryToYaml(WorkerDesc, ObjStr);
+ ZEN_CONSOLE("worker {}:\n{}", WorkerId, ObjStr);
+# endif
+ }
+ }
+
+ if (m_VerboseLogging)
+ {
+ EmitFunctionList(m_FunctionList);
+ }
+
+ // Iterate over work items and dispatch or log them
+
+ int ReturnValue = 0;
+
+ Stopwatch ExecTimer;
+
+ switch (Mode)
+ {
+ case kHttp:
+ // Forward requests to HTTP function service
+ ReturnValue = HttpExecute();
+ break;
+
+ case kDirect:
+ // Not currently supported
+ ReturnValue = LocalMessagingExecute();
+ break;
+
+ case kInproc:
+ // Handle execution in-core (by spawning child processes)
+ ReturnValue = InProcessExecute();
+ break;
+
+ case kDump:
+ // Dump high level information about actions to console
+ ReturnValue = DumpWorkItems();
+ break;
+
+ case kBeacon:
+ ReturnValue = BeaconExecute();
+ break;
+
+ case kBuildLog:
+ ReturnValue = BuildActionsLog();
+ break;
+
+ default:
+ ZEN_ERROR("Unknown operating mode! No work submitted");
+
+ ReturnValue = 1;
+ }
+
+ ZEN_CONSOLE("complete - took {}", NiceTimeSpanMs(ExecTimer.GetElapsedTimeMs()));
+
+ if (!ReturnValue)
+ {
+ ZEN_CONSOLE("all work items completed successfully");
+ }
+ else
+ {
+ ZEN_CONSOLE("some work items failed (code {})", ReturnValue);
+ }
+}
+
+int
+ExecCommand::InProcessExecute()
+{
+ ZEN_ASSERT(m_ChunkResolver);
+ ChunkResolver& Resolver = *m_ChunkResolver;
+
+ zen::compute::ComputeServiceSession ComputeSession(Resolver);
+
+ std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp");
+ ComputeSession.AddLocalRunner(Resolver, TempPath);
+
+ return ExecUsingSession(ComputeSession);
+}
+
+int
+ExecCommand::ExecUsingSession(zen::compute::ComputeServiceSession& ComputeSession)
+{
+ struct JobTracker
+ {
+ public:
+ inline void Insert(int LsnField)
+ {
+ RwLock::ExclusiveLockScope _(Lock);
+ PendingJobs.insert(LsnField);
+ }
+
+ inline bool IsEmpty() const
+ {
+ RwLock::SharedLockScope _(Lock);
+ return PendingJobs.empty();
+ }
+
+ inline void Remove(int CompleteLsn)
+ {
+ RwLock::ExclusiveLockScope _(Lock);
+ PendingJobs.erase(CompleteLsn);
+ }
+
+ inline size_t GetSize() const
+ {
+ RwLock::SharedLockScope _(Lock);
+ return PendingJobs.size();
+ }
+
+ private:
+ mutable RwLock Lock;
+ std::unordered_set<int> PendingJobs;
+ };
+
+ JobTracker PendingJobs;
+
+ struct ActionSummaryEntry
+ {
+ int32_t Lsn = 0;
+ int RecordingIndex = 0;
+ IoHash ActionId;
+ std::string FunctionName;
+ int InputAttachments = 0;
+ uint64_t InputBytes = 0;
+ int OutputAttachments = 0;
+ uint64_t OutputBytes = 0;
+ float WallSeconds = 0.0f;
+ float CpuSeconds = 0.0f;
+ uint64_t SubmittedTicks = 0;
+ uint64_t StartedTicks = 0;
+ std::string ExecutionLocation;
+ };
+
+ std::mutex SummaryLock;
+ std::unordered_map<int32_t, ActionSummaryEntry> SummaryEntries;
+
+ ComputeSession.WaitUntilReady();
+
+ // Register as a client with the orchestrator (best-effort)
+
+ std::string OrchestratorClientId;
+
+ if (!m_OrchestratorUrl.empty())
+ {
+ try
+ {
+ HttpClient OrchestratorClient(m_OrchestratorUrl);
+
+ CbObjectWriter Ann;
+ Ann << "session_id"sv << GetSessionId();
+ Ann << "hostname"sv << std::string_view(GetMachineName());
+
+ CbObjectWriter Meta;
+ Meta << "source"sv
+ << "zen-exec"sv;
+ Ann << "metadata"sv << Meta.Save();
+
+ auto Resp = OrchestratorClient.Post("/orch/clients", Ann.Save());
+ if (Resp.IsSuccess())
+ {
+ OrchestratorClientId = std::string(Resp.AsObject()["id"].AsString());
+ ZEN_CONSOLE_INFO("registered with orchestrator as {}", OrchestratorClientId);
+ }
+ else
+ {
+ ZEN_WARN("failed to register with orchestrator (status {})", static_cast<int>(Resp.StatusCode));
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("failed to register with orchestrator: {}", Ex.what());
+ }
+ }
+
+ Stopwatch OrchestratorHeartbeatTimer;
+
+ auto SendOrchestratorHeartbeat = [&] {
+ if (OrchestratorClientId.empty() || OrchestratorHeartbeatTimer.GetElapsedTimeMs() < 30'000)
+ {
+ return;
+ }
+ OrchestratorHeartbeatTimer.Reset();
+ try
+ {
+ HttpClient OrchestratorClient(m_OrchestratorUrl);
+ std::ignore = OrchestratorClient.Post(fmt::format("/orch/clients/{}/update", OrchestratorClientId));
+ }
+ catch (...)
+ {
+ }
+ };
+
+ auto ClientCleanup = MakeGuard([&] {
+ if (!OrchestratorClientId.empty())
+ {
+ try
+ {
+ HttpClient OrchestratorClient(m_OrchestratorUrl);
+ std::ignore = OrchestratorClient.Post(fmt::format("/orch/clients/{}/complete", OrchestratorClientId));
+ }
+ catch (...)
+ {
+ }
+ }
+ });
+
+ // Create a queue to group all actions from this exec session
+
+ CbObjectWriter Metadata;
+ Metadata << "source"sv
+ << "zen-exec"sv;
+
+ auto QueueResult = ComputeSession.CreateQueue("zen-exec", Metadata.Save());
+ const int QueueId = QueueResult.QueueId;
+ if (!QueueId)
+ {
+ ZEN_ERROR("failed to create compute queue");
+ return 1;
+ }
+
+ auto QueueCleanup = MakeGuard([&] { ComputeSession.DeleteQueue(QueueId); });
+
+ if (!m_OutputPath.empty())
+ {
+ zen::CreateDirectories(m_OutputPath);
+ }
+
+ std::atomic<int> IsDraining{0};
+
+ auto DrainCompletedJobs = [&] {
+ if (IsDraining.exchange(1))
+ {
+ return;
+ }
+
+ auto _ = MakeGuard([&] { IsDraining.store(0, std::memory_order_release); });
+
+ CbObjectWriter Cbo;
+ ComputeSession.GetQueueCompleted(QueueId, Cbo);
+
+ if (CbObject Completed = Cbo.Save())
+ {
+ for (auto& It : Completed["completed"sv])
+ {
+ int32_t CompleteLsn = It.AsInt32();
+
+ CbPackage ResultPackage;
+ HttpResponseCode Response = ComputeSession.GetActionResult(CompleteLsn, /* out */ ResultPackage);
+
+ if (Response == HttpResponseCode::OK)
+ {
+ if (!m_OutputPath.empty() && ResultPackage)
+ {
+ int OutputAttachments = 0;
+ uint64_t OutputBytes = 0;
+
+ if (!m_Binary)
+ {
+ // Write the root object as YAML
+ ExtendableStringBuilder<4096> YamlStr;
+ CompactBinaryToYaml(ResultPackage.GetObject(), YamlStr);
+
+ std::string_view Yaml = YamlStr;
+ zen::WriteFile(m_OutputPath / fmt::format("{}.result.yaml", CompleteLsn),
+ IoBuffer(IoBuffer::Clone, Yaml.data(), Yaml.size()));
+
+ // Write decompressed attachments
+ auto Attachments = ResultPackage.GetAttachments();
+
+ if (!Attachments.empty())
+ {
+ std::filesystem::path AttDir = m_OutputPath / fmt::format("{}.result.attachments", CompleteLsn);
+ zen::CreateDirectories(AttDir);
+
+ for (const CbAttachment& Att : Attachments)
+ {
+ ++OutputAttachments;
+
+ IoHash AttHash = Att.GetHash();
+
+ if (Att.IsCompressedBinary())
+ {
+ SharedBuffer Decompressed = Att.AsCompressedBinary().Decompress();
+ OutputBytes += Decompressed.GetSize();
+ zen::WriteFile(AttDir / AttHash.ToHexString(),
+ IoBuffer(IoBuffer::Clone, Decompressed.GetData(), Decompressed.GetSize()));
+ }
+ else
+ {
+ SharedBuffer Binary = Att.AsBinary();
+ OutputBytes += Binary.GetSize();
+ zen::WriteFile(AttDir / AttHash.ToHexString(),
+ IoBuffer(IoBuffer::Clone, Binary.GetData(), Binary.GetSize()));
+ }
+ }
+ }
+
+ if (!m_QuietLogging)
+ {
+ ZEN_CONSOLE("saved result: {}/{}.result.yaml ({} attachments)",
+ m_OutputPath.string(),
+ CompleteLsn,
+ OutputAttachments);
+ }
+ }
+ else
+ {
+ CompositeBuffer Serialized = FormatPackageMessageBuffer(ResultPackage);
+ zen::WriteFile(m_OutputPath / fmt::format("{}.result.pkg", CompleteLsn), std::move(Serialized));
+
+ for (const CbAttachment& Att : ResultPackage.GetAttachments())
+ {
+ ++OutputAttachments;
+ OutputBytes += Att.AsBinary().GetSize();
+ }
+
+ if (!m_QuietLogging)
+ {
+ ZEN_CONSOLE("saved result: {}/{}.result.pkg", m_OutputPath.string(), CompleteLsn);
+ }
+ }
+
+ std::lock_guard Lock(SummaryLock);
+ if (auto It2 = SummaryEntries.find(CompleteLsn); It2 != SummaryEntries.end())
+ {
+ It2->second.OutputAttachments = OutputAttachments;
+ It2->second.OutputBytes = OutputBytes;
+ }
+ }
+
+ PendingJobs.Remove(CompleteLsn);
+
+ ZEN_CONSOLE("completed: LSN {} ({} still pending)", CompleteLsn, PendingJobs.GetSize());
+ }
+ }
+ }
+ };
+
+ // Describe workers
+
+ ZEN_CONSOLE("describing {} workers", m_WorkerMap.size());
+
+ for (auto Kv : m_WorkerMap)
+ {
+ CbPackage WorkerDesc = Kv.second;
+
+ ComputeSession.RegisterWorker(WorkerDesc);
+ }
+
+ // Then submit work items
+
+ int FailedWorkCounter = 0;
+ size_t RemainingWorkItems = m_RecordingReader->GetActionCount();
+ int SubmittedWorkItems = 0;
+
+ ZEN_CONSOLE("submitting {} work items", RemainingWorkItems);
+
+ int OffsetCounter = m_Offset;
+ int StrideCounter = m_Stride;
+
+ auto ShouldSchedule = [&]() -> bool {
+ if (m_Limit && SubmittedWorkItems >= m_Limit)
+ {
+ // Limit reached, ignore
+
+ return false;
+ }
+
+ if (OffsetCounter && OffsetCounter--)
+ {
+ // Still in offset, ignore
+
+ return false;
+ }
+
+ if (--StrideCounter == 0)
+ {
+ StrideCounter = m_Stride;
+
+ return true;
+ }
+
+ return false;
+ };
+
+ int TargetParallelism = 8;
+
+ if (OffsetCounter || StrideCounter || m_Limit)
+ {
+ TargetParallelism = 1;
+ }
+
+ std::atomic<int> RecordingIndex{0};
+
+ m_RecordingReader->IterateActions(
+ [&](CbObject ActionObject, const IoHash& ActionId) {
+ // Enqueue job
+
+ const int CurrentRecordingIndex = RecordingIndex++;
+
+ Stopwatch SubmitTimer;
+
+ const int Priority = 0;
+
+ if (ShouldSchedule())
+ {
+ if (m_VerboseLogging)
+ {
+ int AttachmentCount = 0;
+ uint64_t AttachmentBytes = 0;
+ eastl::hash_set<IoHash> ReferencedChunks;
+
+ ActionObject.IterateAttachments([&](CbFieldView Field) {
+ IoHash AttachData = Field.AsAttachment();
+
+ ReferencedChunks.insert(AttachData);
+ ++AttachmentCount;
+
+ if (IoBuffer ChunkData = m_ChunkResolver->FindChunkByCid(AttachData))
+ {
+ AttachmentBytes += ChunkData.GetSize();
+ }
+ });
+
+ zen::ExtendableStringBuilder<1024> ObjStr;
+ zen::CompactBinaryToJson(ActionObject, ObjStr);
+ ZEN_CONSOLE("work item {} ({} attachments, {} bytes): {}",
+ ActionId,
+ AttachmentCount,
+ NiceBytes(AttachmentBytes),
+ ObjStr);
+ }
+
+ if (m_DumpActions)
+ {
+ int AttachmentCount = 0;
+ uint64_t AttachmentBytes = 0;
+
+ ActionObject.IterateAttachments([&](CbFieldView Field) {
+ IoHash AttachData = Field.AsAttachment();
+
+ ++AttachmentCount;
+
+ if (IoBuffer ChunkData = m_ChunkResolver->FindChunkByCid(AttachData))
+ {
+ AttachmentBytes += ChunkData.GetSize();
+ }
+ });
+
+ zen::ExtendableStringBuilder<1024> ObjStr;
+ zen::CompactBinaryToYaml(ActionObject, ObjStr);
+ ZEN_CONSOLE("action {} ({} attachments, {}):\n{}", ActionId, AttachmentCount, NiceBytes(AttachmentBytes), ObjStr);
+ }
+
+ if (zen::compute::ComputeServiceSession::EnqueueResult EnqueueResult =
+ ComputeSession.EnqueueActionToQueue(QueueId, ActionObject, Priority))
+ {
+ const int32_t LsnField = EnqueueResult.Lsn;
+
+ --RemainingWorkItems;
+ ++SubmittedWorkItems;
+
+ if (!m_QuietLogging)
+ {
+ ZEN_CONSOLE("submitted work item #{} - LSN {} - {}. {} remaining",
+ SubmittedWorkItems,
+ LsnField,
+ NiceTimeSpanMs(SubmitTimer.GetElapsedTimeMs()),
+ RemainingWorkItems);
+ }
+
+ if (!m_OutputPath.empty())
+ {
+ ActionSummaryEntry Entry;
+ Entry.Lsn = LsnField;
+ Entry.RecordingIndex = CurrentRecordingIndex;
+ Entry.ActionId = ActionId;
+ Entry.FunctionName = std::string(ActionObject["Function"sv].AsString());
+
+ if (!m_Binary)
+ {
+ // Write action object as YAML
+ ExtendableStringBuilder<4096> YamlStr;
+ CompactBinaryToYaml(ActionObject, YamlStr);
+
+ std::string_view Yaml = YamlStr;
+ zen::WriteFile(m_OutputPath / fmt::format("{}.action.yaml", LsnField),
+ IoBuffer(IoBuffer::Clone, Yaml.data(), Yaml.size()));
+
+ // Write decompressed input attachments
+ std::filesystem::path AttDir = m_OutputPath / fmt::format("{}.action.attachments", LsnField);
+ bool AttDirCreated = false;
+
+ ActionObject.IterateAttachments([&](CbFieldView Field) {
+ IoHash AttachCid = Field.AsAttachment();
+ ++Entry.InputAttachments;
+
+ if (IoBuffer ChunkData = m_ChunkResolver->FindChunkByCid(AttachCid))
+ {
+ IoHash RawHash;
+ uint64_t RawSize = 0;
+ CompressedBuffer Compressed =
+ CompressedBuffer::FromCompressed(SharedBuffer(ChunkData), RawHash, RawSize);
+ SharedBuffer Decompressed = Compressed.Decompress();
+
+ Entry.InputBytes += Decompressed.GetSize();
+
+ if (!AttDirCreated)
+ {
+ zen::CreateDirectories(AttDir);
+ AttDirCreated = true;
+ }
+
+ zen::WriteFile(AttDir / AttachCid.ToHexString(),
+ IoBuffer(IoBuffer::Clone, Decompressed.GetData(), Decompressed.GetSize()));
+ }
+ });
+
+ if (!m_QuietLogging)
+ {
+ ZEN_CONSOLE("saved action: {}/{}.action.yaml ({} attachments)",
+ m_OutputPath.string(),
+ LsnField,
+ Entry.InputAttachments);
+ }
+ }
+ else
+ {
+ // Build a CbPackage from the action and write as .pkg
+ CbPackage ActionPackage;
+ ActionPackage.SetObject(ActionObject);
+
+ ActionObject.IterateAttachments([&](CbFieldView Field) {
+ IoHash AttachCid = Field.AsAttachment();
+ ++Entry.InputAttachments;
+
+ if (IoBuffer ChunkData = m_ChunkResolver->FindChunkByCid(AttachCid))
+ {
+ IoHash RawHash;
+ uint64_t RawSize = 0;
+ CompressedBuffer Compressed =
+ CompressedBuffer::FromCompressed(SharedBuffer(ChunkData), RawHash, RawSize);
+
+ Entry.InputBytes += ChunkData.GetSize();
+ ActionPackage.AddAttachment(CbAttachment(std::move(Compressed), RawHash));
+ }
+ });
+
+ CompositeBuffer Serialized = FormatPackageMessageBuffer(ActionPackage);
+ zen::WriteFile(m_OutputPath / fmt::format("{}.action.pkg", LsnField), std::move(Serialized));
+
+ if (!m_QuietLogging)
+ {
+ ZEN_CONSOLE("saved action: {}/{}.action.pkg", m_OutputPath.string(), LsnField);
+ }
+ }
+
+ std::lock_guard Lock(SummaryLock);
+ SummaryEntries.emplace(LsnField, std::move(Entry));
+ }
+
+ PendingJobs.Insert(LsnField);
+ }
+ else
+ {
+ if (!m_QuietLogging)
+ {
+ std::string_view FunctionName = ActionObject["Function"sv].AsString();
+ const Guid FunctionVersion = ActionObject["FunctionVersion"sv].AsUuid();
+ const Guid BuildSystemVersion = ActionObject["BuildSystemVersion"sv].AsUuid();
+
+ ZEN_ERROR(
+ "failed to resolve function for work with (Function:{},FunctionVersion:{},BuildSystemVersion:{}). Work "
+ "descriptor "
+ "at: 'file://{}'",
+ std::string(FunctionName),
+ FunctionVersion,
+ BuildSystemVersion,
+ "<null>");
+
+ EmitFunctionListOnce(m_FunctionList);
+ }
+
+ ++FailedWorkCounter;
+ }
+ }
+
+ // Check for completed work
+
+ DrainCompletedJobs();
+ SendOrchestratorHeartbeat();
+ },
+ TargetParallelism);
+
+ // Wait until all pending work is complete
+
+ while (!PendingJobs.IsEmpty())
+ {
+ // TODO: improve this logic
+ zen::Sleep(500);
+
+ DrainCompletedJobs();
+ SendOrchestratorHeartbeat();
+ }
+
+ // Merge timing data from queue history into summary entries
+
+ if (!SummaryEntries.empty())
+ {
+ // RunnerAction::State indices (can't include functionrunner.h from here)
+ constexpr int kStateNew = 0;
+ constexpr int kStatePending = 1;
+ constexpr int kStateRunning = 3;
+ constexpr int kStateCompleted = 4; // first terminal state
+ constexpr int kStateCount = 8;
+
+ for (const auto& HistEntry : ComputeSession.GetQueueHistory(QueueId, 0))
+ {
+ std::lock_guard Lock(SummaryLock);
+ if (auto It = SummaryEntries.find(HistEntry.Lsn); It != SummaryEntries.end())
+ {
+ // Find terminal state timestamp (Completed, Failed, Abandoned, or Cancelled)
+ uint64_t EndTick = 0;
+ for (int S = kStateCompleted; S < kStateCount; ++S)
+ {
+ if (HistEntry.Timestamps[S] != 0)
+ {
+ EndTick = HistEntry.Timestamps[S];
+ break;
+ }
+ }
+ uint64_t StartTick = HistEntry.Timestamps[kStateNew];
+ if (EndTick > StartTick)
+ {
+ It->second.WallSeconds = float(double(EndTick - StartTick) / double(TimeSpan::TicksPerSecond));
+ }
+ It->second.CpuSeconds = HistEntry.CpuSeconds;
+ It->second.SubmittedTicks = HistEntry.Timestamps[kStatePending];
+ It->second.StartedTicks = HistEntry.Timestamps[kStateRunning];
+ It->second.ExecutionLocation = HistEntry.ExecutionLocation;
+ }
+ }
+ }
+
+ // Write summary file if output path is set
+
+ if (!m_OutputPath.empty() && !SummaryEntries.empty())
+ {
+ std::vector<ActionSummaryEntry> Sorted;
+ Sorted.reserve(SummaryEntries.size());
+ for (auto& [_, Entry] : SummaryEntries)
+ {
+ Sorted.push_back(std::move(Entry));
+ }
+
+ std::sort(Sorted.begin(), Sorted.end(), [](const ActionSummaryEntry& A, const ActionSummaryEntry& B) {
+ return A.RecordingIndex < B.RecordingIndex;
+ });
+
+ auto FormatTimestamp = [](uint64_t Ticks) -> std::string {
+ if (Ticks == 0)
+ {
+ return "-";
+ }
+ return DateTime(Ticks).ToString("%H:%M:%S.%s");
+ };
+
+ ExtendableStringBuilder<4096> Summary;
+ Summary.Append(fmt::format("{:<8} {:<8} {:<40} {:<40} {:>8} {:>12} {:>8} {:>12} {:>8} {:>8} {:>12} {:>12} {:<24}\n",
+ "LSN",
+ "Index",
+ "ActionId",
+ "Function",
+ "InAtt",
+ "InBytes",
+ "OutAtt",
+ "OutBytes",
+ "Wall(s)",
+ "CPU(s)",
+ "Submitted",
+ "Started",
+ "Location"));
+ Summary.Append(fmt::format("{:-<8} {:-<8} {:-<40} {:-<40} {:-<8} {:-<12} {:-<8} {:-<12} {:-<8} {:-<8} {:-<12} {:-<12} {:-<24}\n",
+ "",
+ "",
+ "",
+ "",
+ "",
+ "",
+ "",
+ "",
+ "",
+ "",
+ "",
+ "",
+ ""));
+
+ for (const ActionSummaryEntry& Entry : Sorted)
+ {
+ Summary.Append(fmt::format("{:<8} {:<8} {:<40} {:<40} {:>8} {:>12} {:>8} {:>12} {:>8.2f} {:>8.2f} {:>12} {:>12} {:<24}\n",
+ Entry.Lsn,
+ Entry.RecordingIndex,
+ Entry.ActionId,
+ Entry.FunctionName,
+ Entry.InputAttachments,
+ NiceBytes(Entry.InputBytes),
+ Entry.OutputAttachments,
+ NiceBytes(Entry.OutputBytes),
+ Entry.WallSeconds,
+ Entry.CpuSeconds,
+ FormatTimestamp(Entry.SubmittedTicks),
+ FormatTimestamp(Entry.StartedTicks),
+ Entry.ExecutionLocation));
+ }
+
+ std::filesystem::path SummaryPath = m_OutputPath / "summary.txt";
+ std::string_view SummaryStr = Summary;
+ zen::WriteFile(SummaryPath, IoBuffer(IoBuffer::Clone, SummaryStr.data(), SummaryStr.size()));
+
+ ZEN_CONSOLE("wrote summary to {}", SummaryPath.string());
+
+ if (!m_Binary)
+ {
+ auto EscapeHtml = [](std::string_view Input) -> std::string {
+ std::string Out;
+ Out.reserve(Input.size());
+ for (char C : Input)
+ {
+ switch (C)
+ {
+ case '&':
+ Out += "&amp;";
+ break;
+ case '<':
+ Out += "&lt;";
+ break;
+ case '>':
+ Out += "&gt;";
+ break;
+ case '"':
+ Out += "&quot;";
+ break;
+ case '\'':
+ Out += "&#39;";
+ break;
+ default:
+ Out += C;
+ }
+ }
+ return Out;
+ };
+
+ auto EscapeJson = [](std::string_view Input) -> std::string {
+ std::string Out;
+ Out.reserve(Input.size());
+ for (char C : Input)
+ {
+ switch (C)
+ {
+ case '"':
+ Out += "\\\"";
+ break;
+ case '\\':
+ Out += "\\\\";
+ break;
+ case '\n':
+ Out += "\\n";
+ break;
+ case '\r':
+ Out += "\\r";
+ break;
+ case '\t':
+ Out += "\\t";
+ break;
+ default:
+ if (static_cast<unsigned char>(C) < 0x20)
+ {
+ Out += fmt::format("\\u{:04x}", static_cast<unsigned>(static_cast<unsigned char>(C)));
+ }
+ else
+ {
+ Out += C;
+ }
+ }
+ }
+ return Out;
+ };
+
+ ExtendableStringBuilder<8192> Html;
+
+ Html.Append(std::string_view(R"(<!DOCTYPE html>
+<html><head><meta charset="utf-8"><title>Exec Summary</title>
+<style>
+body{font-family:system-ui,sans-serif;margin:20px;background:#fafafa}
+#container{overflow-y:auto;height:calc(100vh - 120px)}
+table{border-collapse:collapse;width:100%}
+th,td{border:1px solid #ddd;padding:6px 10px;text-align:left;white-space:nowrap}
+th{background:#f0f0f0;cursor:pointer;user-select:none;position:sticky;top:0;z-index:1}
+th:hover{background:#e0e0e0}
+th .arrow{font-size:0.7em;margin-left:4px}
+tr:hover{background:#e8f0fe}
+input{padding:6px 10px;margin-bottom:12px;width:300px;border:1px solid #ccc;border-radius:4px}
+button{padding:6px 14px;margin-left:8px;margin-bottom:12px;border:1px solid #ccc;border-radius:4px;background:#f0f0f0;cursor:pointer}
+button:hover{background:#e0e0e0}
+a{color:#1a73e8;text-decoration:none}
+a:hover{text-decoration:underline}
+.num{text-align:right}
+</style></head><body>
+<h2>Exec Summary</h2>
+<input type="text" id="filter" placeholder="Filter by function name..."><button id="csvBtn">Export CSV</button>
+<div id="container">
+<table><thead><tr>
+<th data-col="0">LSN <span class="arrow"></span></th>
+<th data-col="1">Index <span class="arrow"></span></th>
+<th data-col="2">Action ID <span class="arrow"></span></th>
+<th data-col="3">Function <span class="arrow"></span></th>
+<th data-col="4">In Attachments <span class="arrow"></span></th>
+<th data-col="5">In Bytes <span class="arrow"></span></th>
+<th data-col="6">Out Attachments <span class="arrow"></span></th>
+<th data-col="7">Out Bytes <span class="arrow"></span></th>
+<th data-col="8">Wall(s) <span class="arrow"></span></th>
+<th data-col="9">CPU(s) <span class="arrow"></span></th>
+<th data-col="10">Submitted <span class="arrow"></span></th>
+<th data-col="11">Started <span class="arrow"></span></th>
+<th data-col="12">Location <span class="arrow"></span></th>
+</tr></thead><tbody>
+<tr id="spacerTop"><td colspan="13"></td></tr>
+<tr id="spacerBot"><td colspan="13"></td></tr>
+</tbody></table></div>
+<script>
+const DATA=[
+)"));
+
+ std::string_view ResultExt = ".result.yaml";
+ std::string_view ActionExt = ".action.yaml";
+
+ for (const ActionSummaryEntry& Entry : Sorted)
+ {
+ std::string SafeName = EscapeJson(EscapeHtml(Entry.FunctionName));
+ std::string ActionIdStr = Entry.ActionId.ToHexString();
+ std::string ActionLink;
+ if (!ActionExt.empty())
+ {
+ ActionLink = EscapeJson(fmt::format(" <a href=\"{}{}\">[action]</a>", Entry.Lsn, ActionExt));
+ }
+
+ // Indices: 0=lsn, 1=idx, 2=actionId, 3=fn, 4=inAtt, 5=inBytes, 6=outAtt, 7=outBytes,
+ // 8=wall, 9=cpu, 10=niceBytesIn, 11=niceBytesOut, 12=actionLink,
+ // 13=submittedTicks, 14=startedTicks, 15=submittedDisplay, 16=startedDisplay,
+ // 17=location
+ Html.Append(
+ fmt::format("[{},{},\"{}\",\"{}\",{},{},{},{},{:.6f},{:.6f},\"{}\",\"{}\",\"{}\",{},{},\"{}\",\"{}\",\"{}\"],\n",
+ Entry.Lsn,
+ Entry.RecordingIndex,
+ ActionIdStr,
+ SafeName,
+ Entry.InputAttachments,
+ Entry.InputBytes,
+ Entry.OutputAttachments,
+ Entry.OutputBytes,
+ Entry.WallSeconds,
+ Entry.CpuSeconds,
+ EscapeJson(NiceBytes(Entry.InputBytes)),
+ EscapeJson(NiceBytes(Entry.OutputBytes)),
+ ActionLink,
+ Entry.SubmittedTicks,
+ Entry.StartedTicks,
+ FormatTimestamp(Entry.SubmittedTicks),
+ FormatTimestamp(Entry.StartedTicks),
+ EscapeJson(EscapeHtml(Entry.ExecutionLocation))));
+ }
+
+ Html.Append(fmt::format(R"(];
+const RESULT_EXT="{}";
+)",
+ ResultExt));
+
+ Html.Append(std::string_view(R"JS((function(){
+const ROW_H=33,BUF=20;
+const container=document.getElementById("container");
+const tbody=container.querySelector("tbody");
+const headers=container.querySelectorAll("th");
+const filterInput=document.getElementById("filter");
+const spacerTop=document.getElementById("spacerTop");
+const spacerBot=document.getElementById("spacerBot");
+let view=[...DATA.keys()];
+let sortCol=-1,sortAsc=true;
+const COLS=[
+ {f:0,t:"n"},{f:1,t:"n"},{f:2,t:"s"},{f:3,t:"s"},
+ {f:4,t:"n"},{f:5,t:"n"},{f:6,t:"n"},{f:7,t:"n"},
+ {f:8,t:"n"},{f:9,t:"n"},{f:13,t:"n"},{f:14,t:"n"},{f:17,t:"s"}
+];
+function rowHtml(i){
+ const d=DATA[view[i]];
+ const bg=i%2?' style="background:#f9f9f9"':'';
+ return '<tr'+bg+'>'+
+ '<td class="num"><a href="'+d[0]+RESULT_EXT+'">'+d[0]+'</a></td>'+
+ '<td class="num">'+d[1]+'</td>'+
+ '<td><code>'+d[2]+'</code></td>'+
+ '<td>'+d[3]+d[12]+'</td>'+
+ '<td class="num">'+d[4]+'</td>'+
+ '<td class="num">'+d[10]+'</td>'+
+ '<td class="num">'+d[6]+'</td>'+
+ '<td class="num">'+d[11]+'</td>'+
+ '<td class="num">'+d[8].toFixed(2)+'</td>'+
+ '<td class="num">'+d[9].toFixed(2)+'</td>'+
+ '<td class="num">'+d[15]+'</td>'+
+ '<td class="num">'+d[16]+'</td>'+
+ '<td>'+d[17]+'</td></tr>';
+}
+let lastFirst=-1,lastLast=-1;
+function render(){
+ const scrollTop=container.scrollTop;
+ const viewH=container.clientHeight;
+ let first=Math.floor(scrollTop/ROW_H)-BUF;
+ let last=Math.ceil((scrollTop+viewH)/ROW_H)+BUF;
+ if(first<0) first=0;
+ if(last>=view.length) last=view.length-1;
+ if(first===lastFirst&&last===lastLast) return;
+ lastFirst=first;lastLast=last;
+ const rows=[];
+ for(let i=first;i<=last;i++) rows.push(rowHtml(i));
+ spacerTop.style.height=(first*ROW_H)+'px';
+ spacerBot.style.height=((view.length-1-last)*ROW_H)+'px';
+ const mid=rows.join('');
+ const topTr='<tr id="spacerTop"><td colspan="13" style="border:0;padding:0;height:'+spacerTop.style.height+'"></td></tr>';
+ const botTr='<tr id="spacerBot"><td colspan="13" style="border:0;padding:0;height:'+spacerBot.style.height+'"></td></tr>';
+ tbody.innerHTML=topTr+mid+botTr;
+}
+function applySort(){
+ if(sortCol<0) return;
+ const c=COLS[sortCol];
+ view.sort((a,b)=>{
+ const va=DATA[a][c.f],vb=DATA[b][c.f];
+ if(c.t==="n") return sortAsc?va-vb:vb-va;
+ return sortAsc?(va<vb?-1:va>vb?1:0):(va>vb?-1:va<vb?1:0);
+ });
+}
+function rebuild(){
+ const q=filterInput.value.toLowerCase();
+ view=[];
+ for(let i=0;i<DATA.length;i++){
+ if(!q||DATA[i][3].toLowerCase().includes(q)) view.push(i);
+ }
+ applySort();
+ lastFirst=lastLast=-1;
+ render();
+}
+headers.forEach(th=>{
+ th.addEventListener("click",()=>{
+ const col=parseInt(th.dataset.col);
+ if(sortCol===col){sortAsc=!sortAsc}else{sortCol=col;sortAsc=true}
+ headers.forEach(h=>h.querySelector(".arrow").textContent="");
+ th.querySelector(".arrow").textContent=sortAsc?"\u25B2":"\u25BC";
+ applySort();
+ lastFirst=lastLast=-1;
+ render();
+ });
+});
+filterInput.addEventListener("input",()=>rebuild());
+let ticking=false;
+container.addEventListener("scroll",()=>{
+ if(!ticking){ticking=true;requestAnimationFrame(()=>{render();ticking=false})}
+});
+rebuild();
+document.getElementById("csvBtn").addEventListener("click",()=>{
+ const H=["LSN","Index","Action ID","Function","In Attachments","In Bytes","Out Attachments","Out Bytes","Wall(s)","CPU(s)","Submitted","Started","Location"];
+ const esc=v=>{const s=String(v);return s.includes(',')||s.includes('"')||s.includes('\n')?'"'+s.replace(/"/g,'""')+'"':s};
+ const rows=[H.join(",")];
+ for(let i=0;i<view.length;i++){
+ const d=DATA[view[i]];
+ rows.push([d[0],d[1],d[2],d[3],d[4],d[5],d[6],d[7],d[8],d[9],d[15],d[16],d[17]].map(esc).join(","));
+ }
+ const blob=new Blob([rows.join("\n")],{type:"text/csv"});
+ const a=document.createElement("a");
+ a.href=URL.createObjectURL(blob);
+ a.download="summary.csv";
+ a.click();
+ URL.revokeObjectURL(a.href);
+});
+})();
+</script></body></html>
+)JS"));
+
+ std::filesystem::path HtmlPath = m_OutputPath / "summary.html";
+ std::string_view HtmlStr = Html;
+ zen::WriteFile(HtmlPath, IoBuffer(IoBuffer::Clone, HtmlStr.data(), HtmlStr.size()));
+
+ ZEN_CONSOLE("wrote HTML summary to {}", HtmlPath.string());
+ }
+ }
+
+ if (FailedWorkCounter)
+ {
+ return 1;
+ }
+
+ return 0;
+}
+
+int
+ExecCommand::LocalMessagingExecute()
+{
+ // Non-HTTP work submission path
+
+ // To be reimplemented using final transport
+
+ return 0;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+int
+ExecCommand::HttpExecute()
+{
+ ZEN_ASSERT(m_ChunkResolver);
+ ChunkResolver& Resolver = *m_ChunkResolver;
+
+ std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp");
+
+ zen::compute::ComputeServiceSession ComputeSession(Resolver);
+ ComputeSession.AddRemoteRunner(Resolver, TempPath, m_HostName);
+
+ return ExecUsingSession(ComputeSession);
+}
+
+int
+ExecCommand::BeaconExecute()
+{
+ ZEN_ASSERT(m_ChunkResolver);
+ ChunkResolver& Resolver = *m_ChunkResolver;
+ std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp");
+
+ zen::compute::ComputeServiceSession ComputeSession(Resolver);
+
+ if (!m_OrchestratorUrl.empty())
+ {
+ ZEN_CONSOLE_INFO("using orchestrator at {}", m_OrchestratorUrl);
+ ComputeSession.SetOrchestratorEndpoint(m_OrchestratorUrl);
+ ComputeSession.SetOrchestratorBasePath(TempPath);
+ }
+ else
+ {
+ ZEN_CONSOLE_INFO("note: using hard-coded local worker path");
+ ComputeSession.AddRemoteRunner(Resolver, TempPath, "http://localhost:8558");
+ }
+
+ return ExecUsingSession(ComputeSession);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+void
+ExecCommand::RegisterWorkerFunctionsFromDescription(const CbObject& WorkerDesc, const IoHash& WorkerId)
+{
+ const Guid WorkerBuildSystemVersion = WorkerDesc["buildsystem_version"sv].AsUuid();
+
+ for (auto& Item : WorkerDesc["functions"sv])
+ {
+ CbObjectView Function = Item.AsObjectView();
+
+ std::string_view FunctionName = Function["name"sv].AsString();
+ const Guid FunctionVersion = Function["version"sv].AsUuid();
+
+ m_FunctionList.emplace_back(FunctionDefinition{.FunctionName = std::string{FunctionName},
+ .FunctionVersion = FunctionVersion,
+ .BuildSystemVersion = WorkerBuildSystemVersion,
+ .WorkerId = WorkerId});
+ }
+}
+
+void
+ExecCommand::EmitFunctionListOnce(const std::vector<FunctionDefinition>& FunctionList)
+{
+ if (m_FunctionListEmittedOnce == false)
+ {
+ EmitFunctionList(FunctionList);
+
+ m_FunctionListEmittedOnce = true;
+ }
+}
+
+int
+ExecCommand::DumpWorkItems()
+{
+ std::atomic<int> EmittedCount{0};
+
+ eastl::hash_map<IoHash, uint64_t> SeenAttachments; // Attachment CID -> count of references
+
+ m_RecordingReader->IterateActions(
+ [&](CbObject ActionObject, const IoHash& ActionId) {
+ eastl::hash_map<IoHash, CompressedBuffer> Attachments;
+
+ uint64_t AttachmentBytes = 0;
+ uint64_t UncompressedAttachmentBytes = 0;
+
+ ActionObject.IterateAttachments([&](const zen::CbFieldView AttachmentField) {
+ const IoHash AttachmentCid = AttachmentField.GetValue().AsHash();
+ IoBuffer AttachmentData = m_ChunkResolver->FindChunkByCid(AttachmentCid);
+ IoHash RawHash;
+ uint64_t RawSize = 0;
+ CompressedBuffer CompressedData = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentData), RawHash, RawSize);
+ Attachments[AttachmentCid] = CompressedData;
+
+ AttachmentBytes += CompressedData.GetCompressedSize();
+ UncompressedAttachmentBytes += CompressedData.DecodeRawSize();
+
+ if (auto [Iter, Inserted] = SeenAttachments.insert({AttachmentCid, 1}); !Inserted)
+ {
+ ++Iter->second;
+ }
+ });
+
+ zen::ExtendableStringBuilder<1024> ObjStr;
+
+# if 0
+ zen::CompactBinaryToJson(ActionObject, ObjStr);
+ ZEN_CONSOLE("work item {} ({} attachments): {}", ActionId, Attachments.size(), ObjStr);
+# else
+ zen::CompactBinaryToYaml(ActionObject, ObjStr);
+ ZEN_CONSOLE("work item {} ({} attachments, {}->{} bytes):\n{}",
+ ActionId,
+ Attachments.size(),
+ AttachmentBytes,
+ UncompressedAttachmentBytes,
+ ObjStr);
+# endif
+
+ ++EmittedCount;
+ },
+ 1);
+
+ ZEN_CONSOLE("emitted: {} actions", EmittedCount.load());
+
+ eastl::map<uint64_t, std::vector<IoHash>> ReferenceHistogram;
+
+ for (const auto& [K, V] : SeenAttachments)
+ {
+ if (V > 1)
+ {
+ ReferenceHistogram[V].push_back(K);
+ }
+ }
+
+ for (const auto& [RefCount, Cids] : ReferenceHistogram)
+ {
+ ZEN_CONSOLE("{} attachments with {} references", Cids.size(), RefCount);
+ }
+
+ return 0;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+int
+ExecCommand::BuildActionsLog()
+{
+ ZEN_ASSERT(m_ChunkResolver);
+ ChunkResolver& Resolver = *m_ChunkResolver;
+
+ if (m_RecordingPath.empty())
+ {
+ throw OptionParseException("need to specify recording path", m_Options.help());
+ }
+
+ if (std::filesystem::exists(m_RecordingLogPath))
+ {
+ throw OptionParseException(fmt::format("recording log directory '{}' already exists!", m_RecordingLogPath), m_Options.help());
+ }
+
+ ZEN_NOT_IMPLEMENTED("build log generation not implemented yet!");
+
+ std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp");
+
+ zen::compute::ComputeServiceSession ComputeSession(Resolver);
+ ComputeSession.StartRecording(Resolver, m_RecordingLogPath);
+
+ return ExecUsingSession(ComputeSession);
+}
+
+void
+ExecCommand::EmitFunctionList(const std::vector<FunctionDefinition>& FunctionList)
+{
+ ZEN_CONSOLE("=== Known functions:\n===========================");
+
+ ZEN_CONSOLE("{:30} {:36} {:36} {}", "function", "version", "build system", "worker id");
+
+ for (const FunctionDefinition& Func : FunctionList)
+ {
+ ZEN_CONSOLE("{:30} {:36} {:36} {}", Func.FunctionName, Func.FunctionVersion, Func.BuildSystemVersion, Func.WorkerId);
+ }
+
+ ZEN_CONSOLE("===========================");
+}
+
+} // namespace zen
+
+#endif // ZEN_WITH_COMPUTE_SERVICES