aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorzousar <[email protected]>2026-02-18 23:19:14 -0700
committerzousar <[email protected]>2026-02-18 23:19:14 -0700
commit2ba28acaf034722452f82cfb07afc0a4bb90eeab (patch)
treec00dea385597180673be6e02aca6c07d9ef6ec00 /src
parentupdatefrontend (diff)
parentstructured compute basics (#714) (diff)
downloadzen-2ba28acaf034722452f82cfb07afc0a4bb90eeab.tar.xz
zen-2ba28acaf034722452f82cfb07afc0a4bb90eeab.zip
Merge branch 'main' into zs/web-ui-improvements
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/exec_cmd.cpp654
-rw-r--r--src/zen/cmds/exec_cmd.h97
-rw-r--r--src/zen/xmake.lua5
-rw-r--r--src/zen/zen.cpp39
-rw-r--r--src/zencompute-test/xmake.lua9
-rw-r--r--src/zencompute-test/zencompute-test.cpp32
-rw-r--r--src/zencompute/actionrecorder.cpp258
-rw-r--r--src/zencompute/actionrecorder.h91
-rw-r--r--src/zencompute/functionrunner.cpp112
-rw-r--r--src/zencompute/functionrunner.h207
-rw-r--r--src/zencompute/functionservice.cpp957
-rw-r--r--src/zencompute/httpfunctionservice.cpp709
-rw-r--r--src/zencompute/httporchestrator.cpp81
-rw-r--r--src/zencompute/include/zencompute/functionservice.h132
-rw-r--r--src/zencompute/include/zencompute/httpfunctionservice.h73
-rw-r--r--src/zencompute/include/zencompute/httporchestrator.h44
-rw-r--r--src/zencompute/include/zencompute/recordingreader.h127
-rw-r--r--src/zencompute/include/zencompute/zencompute.h11
-rw-r--r--src/zencompute/localrunner.cpp722
-rw-r--r--src/zencompute/localrunner.h100
-rw-r--r--src/zencompute/recordingreader.cpp335
-rw-r--r--src/zencompute/remotehttprunner.cpp457
-rw-r--r--src/zencompute/remotehttprunner.h80
-rw-r--r--src/zencompute/xmake.lua11
-rw-r--r--src/zencompute/zencompute.cpp12
-rw-r--r--src/zencore/include/zencore/system.h1
-rw-r--r--src/zencore/system.cpp169
-rw-r--r--src/zenhttp/servers/httpsys.cpp25
-rw-r--r--src/zennet/beacon.cpp170
-rw-r--r--src/zennet/include/zennet/beacon.h38
-rw-r--r--src/zennet/include/zennet/statsdclient.h2
-rw-r--r--src/zennet/statsdclient.cpp1
-rw-r--r--src/zenremotestore/builds/buildstorageoperations.cpp68
-rw-r--r--src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h3
-rw-r--r--src/zenserver-test/function-tests.cpp34
-rw-r--r--src/zenserver/compute/computeserver.cpp330
-rw-r--r--src/zenserver/compute/computeserver.h106
-rw-r--r--src/zenserver/compute/computeservice.cpp100
-rw-r--r--src/zenserver/compute/computeservice.h36
-rw-r--r--src/zenserver/frontend/html/compute.html991
-rw-r--r--src/zenserver/main.cpp55
-rw-r--r--src/zenserver/storage/storageconfig.cpp1
-rw-r--r--src/zenserver/storage/storageconfig.h1
-rw-r--r--src/zenserver/storage/zenstorageserver.cpp21
-rw-r--r--src/zenserver/storage/zenstorageserver.h26
-rw-r--r--src/zenserver/xmake.lua4
-rw-r--r--src/zenserver/zenserver.cpp8
-rw-r--r--src/zenserver/zenserver.h13
-rw-r--r--src/zentest-appstub/xmake.lua3
-rw-r--r--src/zentest-appstub/zentest-appstub.cpp391
50 files changed, 7853 insertions, 99 deletions
diff --git a/src/zen/cmds/exec_cmd.cpp b/src/zen/cmds/exec_cmd.cpp
new file mode 100644
index 000000000..2d9d0d12e
--- /dev/null
+++ b/src/zen/cmds/exec_cmd.cpp
@@ -0,0 +1,654 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "exec_cmd.h"
+
+#include <zencompute/functionservice.h>
+#include <zencompute/recordingreader.h>
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinaryfile.h>
+#include <zencore/compactbinarypackage.h>
+#include <zencore/compactbinaryvalue.h>
+#include <zencore/compress.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
+#include <zencore/stream.h>
+#include <zencore/string.h>
+#include <zencore/timer.h>
+
+#include <EASTL/hash_map.h>
+#include <EASTL/hash_set.h>
+#include <EASTL/map.h>
+
+using namespace std::literals;
+
+namespace eastl {
+
+template<>
+struct hash<zen::IoHash> : public zen::IoHash::Hasher
+{
+};
+
+} // namespace eastl
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+namespace zen {
+
+ExecCommand::ExecCommand()
+{
+ m_Options.add_options()("h,help", "Print help");
+ m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName), "<hosturl>");
+ m_Options.add_option("", "", "log", "Action log directory", cxxopts::value(m_RecordingLogPath), "<path>");
+ m_Options.add_option("", "p", "path", "Recording path (directory or .actionlog file)", cxxopts::value(m_RecordingPath), "<path>");
+ m_Options.add_option("", "", "offset", "Recording replay start offset", cxxopts::value(m_Offset), "<offset>");
+ m_Options.add_option("", "", "stride", "Recording replay stride", cxxopts::value(m_Stride), "<stride>");
+ m_Options.add_option("", "", "limit", "Recording replay limit", cxxopts::value(m_Limit), "<limit>");
+ m_Options.add_option("", "", "beacon", "Beacon path", cxxopts::value(m_BeaconPath), "<path>");
+ m_Options.add_option("",
+ "",
+ "mode",
+ "Select execution mode (http,inproc,dump,direct,beacon,buildlog)",
+ cxxopts::value(m_Mode)->default_value("http"),
+ "<string>");
+ m_Options.add_option("", "", "quiet", "Quiet mode (less logging)", cxxopts::value(m_Quiet), "<bool>");
+ m_Options.parse_positional("mode");
+}
+
+ExecCommand::~ExecCommand()
+{
+}
+
+void
+ExecCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
+{
+ // Configure
+
+ if (!ParseOptions(argc, argv))
+ {
+ return;
+ }
+
+ m_HostName = ResolveTargetHostSpec(m_HostName);
+
+ if (m_RecordingPath.empty())
+ {
+ throw OptionParseException("replay path is required!", m_Options.help());
+ }
+
+ m_VerboseLogging = GlobalOptions.IsVerbose;
+ m_QuietLogging = m_Quiet && !m_VerboseLogging;
+
+ enum ExecMode
+ {
+ kHttp,
+ kDirect,
+ kInproc,
+ kDump,
+ kBeacon,
+ kBuildLog
+ } Mode;
+
+ if (m_Mode == "http"sv)
+ {
+ Mode = kHttp;
+ }
+ else if (m_Mode == "direct"sv)
+ {
+ Mode = kDirect;
+ }
+ else if (m_Mode == "inproc"sv)
+ {
+ Mode = kInproc;
+ }
+ else if (m_Mode == "dump"sv)
+ {
+ Mode = kDump;
+ }
+ else if (m_Mode == "beacon"sv)
+ {
+ Mode = kBeacon;
+ }
+ else if (m_Mode == "buildlog"sv)
+ {
+ Mode = kBuildLog;
+ }
+ else
+ {
+ throw OptionParseException("invalid mode specified!", m_Options.help());
+ }
+
+ // Gather information from recording path
+
+ std::unique_ptr<zen::compute::RecordingReader> Reader;
+ std::unique_ptr<zen::compute::UeRecordingReader> UeReader;
+
+ std::filesystem::path RecordingPath{m_RecordingPath};
+
+ if (!std::filesystem::is_directory(RecordingPath))
+ {
+ throw OptionParseException("replay path should be a directory path!", m_Options.help());
+ }
+ else
+ {
+ if (std::filesystem::is_directory(RecordingPath / "cid"))
+ {
+ Reader = std::make_unique<zen::compute::RecordingReader>(RecordingPath);
+ m_WorkerMap = Reader->ReadWorkers();
+ m_ChunkResolver = Reader.get();
+ m_RecordingReader = Reader.get();
+ }
+ else
+ {
+ UeReader = std::make_unique<zen::compute::UeRecordingReader>(RecordingPath);
+ m_WorkerMap = UeReader->ReadWorkers();
+ m_ChunkResolver = UeReader.get();
+ m_RecordingReader = UeReader.get();
+ }
+ }
+
+ ZEN_CONSOLE("found {} workers, {} action items", m_WorkerMap.size(), m_RecordingReader->GetActionCount());
+
+ for (auto& Kv : m_WorkerMap)
+ {
+ CbObject WorkerDesc = Kv.second.GetObject();
+ const IoHash& WorkerId = Kv.first;
+
+ RegisterWorkerFunctionsFromDescription(WorkerDesc, WorkerId);
+
+ if (m_VerboseLogging)
+ {
+ zen::ExtendableStringBuilder<1024> ObjStr;
+# if 0
+ zen::CompactBinaryToJson(WorkerDesc, ObjStr);
+ ZEN_CONSOLE("worker {}: {}", WorkerId, ObjStr);
+# else
+ zen::CompactBinaryToYaml(WorkerDesc, ObjStr);
+ ZEN_CONSOLE("worker {}:\n{}", WorkerId, ObjStr);
+# endif
+ }
+ }
+
+ if (m_VerboseLogging)
+ {
+ EmitFunctionList(m_FunctionList);
+ }
+
+ // Iterate over work items and dispatch or log them
+
+ int ReturnValue = 0;
+
+ Stopwatch ExecTimer;
+
+ switch (Mode)
+ {
+ case kHttp:
+ // Forward requests to HTTP function service
+ ReturnValue = HttpExecute();
+ break;
+
+ case kDirect:
+ // Not currently supported
+ ReturnValue = LocalMessagingExecute();
+ break;
+
+ case kInproc:
+ // Handle execution in-core (by spawning child processes)
+ ReturnValue = InProcessExecute();
+ break;
+
+ case kDump:
+ // Dump high level information about actions to console
+ ReturnValue = DumpWorkItems();
+ break;
+
+ case kBeacon:
+ ReturnValue = BeaconExecute();
+ break;
+
+ case kBuildLog:
+ ReturnValue = BuildActionsLog();
+ break;
+
+ default:
+ ZEN_ERROR("Unknown operating mode! No work submitted");
+
+ ReturnValue = 1;
+ }
+
+ ZEN_CONSOLE("complete - took {}", NiceTimeSpanMs(ExecTimer.GetElapsedTimeMs()));
+
+ if (!ReturnValue)
+ {
+ ZEN_CONSOLE("all work items completed successfully");
+ }
+ else
+ {
+ ZEN_CONSOLE("some work items failed (code {})", ReturnValue);
+ }
+}
+
+int
+ExecCommand::InProcessExecute()
+{
+ ZEN_ASSERT(m_ChunkResolver);
+ ChunkResolver& Resolver = *m_ChunkResolver;
+
+ zen::compute::FunctionServiceSession FunctionSession(Resolver);
+
+ std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp");
+ FunctionSession.AddLocalRunner(Resolver, TempPath);
+
+ return ExecUsingSession(FunctionSession);
+}
+
+int
+ExecCommand::ExecUsingSession(zen::compute::FunctionServiceSession& FunctionSession)
+{
+ struct JobTracker
+ {
+ public:
+ inline void Insert(int LsnField)
+ {
+ RwLock::ExclusiveLockScope _(Lock);
+ PendingJobs.insert(LsnField);
+ }
+
+ inline bool IsEmpty() const
+ {
+ RwLock::SharedLockScope _(Lock);
+ return PendingJobs.empty();
+ }
+
+ inline void Remove(int CompleteLsn)
+ {
+ RwLock::ExclusiveLockScope _(Lock);
+ PendingJobs.erase(CompleteLsn);
+ }
+
+ inline size_t GetSize() const
+ {
+ RwLock::SharedLockScope _(Lock);
+ return PendingJobs.size();
+ }
+
+ private:
+ mutable RwLock Lock;
+ std::unordered_set<int> PendingJobs;
+ };
+
+ JobTracker PendingJobs;
+
+ std::atomic<int> IsDraining{0};
+
+ auto DrainCompletedJobs = [&] {
+ if (IsDraining.exchange(1))
+ {
+ return;
+ }
+
+ auto _ = MakeGuard([&] { IsDraining.store(0, std::memory_order_release); });
+
+ CbObjectWriter Cbo;
+ FunctionSession.GetCompleted(Cbo);
+
+ if (CbObject Completed = Cbo.Save())
+ {
+ for (auto& It : Completed["completed"sv])
+ {
+ int32_t CompleteLsn = It.AsInt32();
+
+ CbPackage ResultPackage;
+ HttpResponseCode Response = FunctionSession.GetActionResult(CompleteLsn, /* out */ ResultPackage);
+
+ if (Response == HttpResponseCode::OK)
+ {
+ PendingJobs.Remove(CompleteLsn);
+
+ ZEN_CONSOLE("completed: LSN {} ({} still pending)", CompleteLsn, PendingJobs.GetSize());
+ }
+ }
+ }
+ };
+
+ // Describe workers
+
+ ZEN_CONSOLE("describing {} workers", m_WorkerMap.size());
+
+ for (auto Kv : m_WorkerMap)
+ {
+ CbPackage WorkerDesc = Kv.second;
+
+ FunctionSession.RegisterWorker(WorkerDesc);
+ }
+
+ // Then submit work items
+
+ int FailedWorkCounter = 0;
+ size_t RemainingWorkItems = m_RecordingReader->GetActionCount();
+ int SubmittedWorkItems = 0;
+
+ ZEN_CONSOLE("submitting {} work items", RemainingWorkItems);
+
+ int OffsetCounter = m_Offset;
+ int StrideCounter = m_Stride;
+
+ auto ShouldSchedule = [&]() -> bool {
+ if (m_Limit && SubmittedWorkItems >= m_Limit)
+ {
+ // Limit reached, ignore
+
+ return false;
+ }
+
+ if (OffsetCounter && OffsetCounter--)
+ {
+ // Still in offset, ignore
+
+ return false;
+ }
+
+ if (--StrideCounter == 0)
+ {
+ StrideCounter = m_Stride;
+
+ return true;
+ }
+
+ return false;
+ };
+
+ m_RecordingReader->IterateActions(
+ [&](CbObject ActionObject, const IoHash& ActionId) {
+ // Enqueue job
+
+ Stopwatch SubmitTimer;
+
+ const int Priority = 0;
+
+ if (ShouldSchedule())
+ {
+ if (m_VerboseLogging)
+ {
+ int AttachmentCount = 0;
+ uint64_t AttachmentBytes = 0;
+ eastl::hash_set<IoHash> ReferencedChunks;
+
+ ActionObject.IterateAttachments([&](CbFieldView Field) {
+ IoHash AttachData = Field.AsAttachment();
+
+ ReferencedChunks.insert(AttachData);
+ ++AttachmentCount;
+
+ if (IoBuffer ChunkData = m_ChunkResolver->FindChunkByCid(AttachData))
+ {
+ AttachmentBytes += ChunkData.GetSize();
+ }
+ });
+
+ zen::ExtendableStringBuilder<1024> ObjStr;
+ zen::CompactBinaryToJson(ActionObject, ObjStr);
+ ZEN_CONSOLE("work item {} ({} attachments, {} bytes): {}",
+ ActionId,
+ AttachmentCount,
+ NiceBytes(AttachmentBytes),
+ ObjStr);
+ }
+
+ if (zen::compute::FunctionServiceSession::EnqueueResult EnqueueResult =
+ FunctionSession.EnqueueAction(ActionObject, Priority))
+ {
+ const int32_t LsnField = EnqueueResult.Lsn;
+
+ --RemainingWorkItems;
+ ++SubmittedWorkItems;
+
+ if (!m_QuietLogging)
+ {
+ ZEN_CONSOLE("submitted work item #{} - LSN {} - {}. {} remaining",
+ SubmittedWorkItems,
+ LsnField,
+ NiceTimeSpanMs(SubmitTimer.GetElapsedTimeMs()),
+ RemainingWorkItems);
+ }
+
+ PendingJobs.Insert(LsnField);
+ }
+ else
+ {
+ if (!m_QuietLogging)
+ {
+ std::string_view FunctionName = ActionObject["Function"sv].AsString();
+ const Guid FunctionVersion = ActionObject["FunctionVersion"sv].AsUuid();
+ const Guid BuildSystemVersion = ActionObject["BuildSystemVersion"sv].AsUuid();
+
+ ZEN_ERROR(
+ "failed to resolve function for work with (Function:{},FunctionVersion:{},BuildSystemVersion:{}). Work "
+ "descriptor "
+ "at: 'file://{}'",
+ std::string(FunctionName),
+ FunctionVersion,
+ BuildSystemVersion,
+ "<null>");
+
+ EmitFunctionListOnce(m_FunctionList);
+ }
+
+ ++FailedWorkCounter;
+ }
+ }
+
+ // Check for completed work
+
+ DrainCompletedJobs();
+ },
+ 8);
+
+ // Wait until all pending work is complete
+
+ while (!PendingJobs.IsEmpty())
+ {
+ // TODO: improve this logic
+ zen::Sleep(500);
+
+ DrainCompletedJobs();
+ }
+
+ if (FailedWorkCounter)
+ {
+ return 1;
+ }
+
+ return 0;
+}
+
+int
+ExecCommand::LocalMessagingExecute()
+{
+ // Non-HTTP work submission path
+
+ // To be reimplemented using final transport
+
+ return 0;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+int
+ExecCommand::HttpExecute()
+{
+ ZEN_ASSERT(m_ChunkResolver);
+ ChunkResolver& Resolver = *m_ChunkResolver;
+
+ std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp");
+
+ zen::compute::FunctionServiceSession FunctionSession(Resolver);
+ FunctionSession.AddRemoteRunner(Resolver, TempPath, m_HostName);
+
+ return ExecUsingSession(FunctionSession);
+}
+
+int
+ExecCommand::BeaconExecute()
+{
+ ZEN_ASSERT(m_ChunkResolver);
+ ChunkResolver& Resolver = *m_ChunkResolver;
+ std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp");
+
+ zen::compute::FunctionServiceSession FunctionSession(Resolver);
+ FunctionSession.AddRemoteRunner(Resolver, TempPath, "http://localhost:8558");
+ // FunctionSession.AddRemoteRunner(Resolver, TempPath, "http://10.99.9.246:8558");
+
+ return ExecUsingSession(FunctionSession);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+void
+ExecCommand::RegisterWorkerFunctionsFromDescription(const CbObject& WorkerDesc, const IoHash& WorkerId)
+{
+ const Guid WorkerBuildSystemVersion = WorkerDesc["buildsystem_version"sv].AsUuid();
+
+ for (auto& Item : WorkerDesc["functions"sv])
+ {
+ CbObjectView Function = Item.AsObjectView();
+
+ std::string_view FunctionName = Function["name"sv].AsString();
+ const Guid FunctionVersion = Function["version"sv].AsUuid();
+
+ m_FunctionList.emplace_back(FunctionDefinition{.FunctionName = std::string{FunctionName},
+ .FunctionVersion = FunctionVersion,
+ .BuildSystemVersion = WorkerBuildSystemVersion,
+ .WorkerId = WorkerId});
+ }
+}
+
+void
+ExecCommand::EmitFunctionListOnce(const std::vector<FunctionDefinition>& FunctionList)
+{
+ if (m_FunctionListEmittedOnce == false)
+ {
+ EmitFunctionList(FunctionList);
+
+ m_FunctionListEmittedOnce = true;
+ }
+}
+
+int
+ExecCommand::DumpWorkItems()
+{
+ std::atomic<int> EmittedCount{0};
+
+ eastl::hash_map<IoHash, uint64_t> SeenAttachments; // Attachment CID -> count of references
+
+ m_RecordingReader->IterateActions(
+ [&](CbObject ActionObject, const IoHash& ActionId) {
+ eastl::hash_map<IoHash, CompressedBuffer> Attachments;
+
+ uint64_t AttachmentBytes = 0;
+ uint64_t UncompressedAttachmentBytes = 0;
+
+ ActionObject.IterateAttachments([&](const zen::CbFieldView AttachmentField) {
+ const IoHash AttachmentCid = AttachmentField.GetValue().AsHash();
+ IoBuffer AttachmentData = m_ChunkResolver->FindChunkByCid(AttachmentCid);
+ IoHash RawHash;
+ uint64_t RawSize = 0;
+ CompressedBuffer CompressedData = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentData), RawHash, RawSize);
+ Attachments[AttachmentCid] = CompressedData;
+
+ AttachmentBytes += CompressedData.GetCompressedSize();
+ UncompressedAttachmentBytes += CompressedData.DecodeRawSize();
+
+ if (auto [Iter, Inserted] = SeenAttachments.insert({AttachmentCid, 1}); !Inserted)
+ {
+ ++Iter->second;
+ }
+ });
+
+ zen::ExtendableStringBuilder<1024> ObjStr;
+
+# if 0
+ zen::CompactBinaryToJson(ActionObject, ObjStr);
+ ZEN_CONSOLE("work item {} ({} attachments): {}", ActionId, Attachments.size(), ObjStr);
+# else
+ zen::CompactBinaryToYaml(ActionObject, ObjStr);
+ ZEN_CONSOLE("work item {} ({} attachments, {}->{} bytes):\n{}",
+ ActionId,
+ Attachments.size(),
+ AttachmentBytes,
+ UncompressedAttachmentBytes,
+ ObjStr);
+# endif
+
+ ++EmittedCount;
+ },
+ 1);
+
+ ZEN_CONSOLE("emitted: {} actions", EmittedCount.load());
+
+ eastl::map<uint64_t, std::vector<IoHash>> ReferenceHistogram;
+
+ for (const auto& [K, V] : SeenAttachments)
+ {
+ if (V > 1)
+ {
+ ReferenceHistogram[V].push_back(K);
+ }
+ }
+
+ for (const auto& [RefCount, Cids] : ReferenceHistogram)
+ {
+ ZEN_CONSOLE("{} attachments with {} references", Cids.size(), RefCount);
+ }
+
+ return 0;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+int
+ExecCommand::BuildActionsLog()
+{
+ ZEN_ASSERT(m_ChunkResolver);
+ ChunkResolver& Resolver = *m_ChunkResolver;
+
+ if (m_RecordingPath.empty())
+ {
+ throw OptionParseException("need to specify recording path", m_Options.help());
+ }
+
+ if (std::filesystem::exists(m_RecordingLogPath))
+ {
+ throw OptionParseException(fmt::format("recording log directory '{}' already exists!", m_RecordingLogPath), m_Options.help());
+ }
+
+ ZEN_NOT_IMPLEMENTED("build log generation not implemented yet!");
+
+ std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp");
+
+ zen::compute::FunctionServiceSession FunctionSession(Resolver);
+ FunctionSession.StartRecording(Resolver, m_RecordingLogPath);
+
+ return ExecUsingSession(FunctionSession);
+}
+
+void
+ExecCommand::EmitFunctionList(const std::vector<FunctionDefinition>& FunctionList)
+{
+ ZEN_CONSOLE("=== Known functions:\n===========================");
+
+ ZEN_CONSOLE("{:30} {:36} {:36} {}", "function", "version", "build system", "worker id");
+
+ for (const FunctionDefinition& Func : FunctionList)
+ {
+ ZEN_CONSOLE("{:30} {:36} {:36} {}", Func.FunctionName, Func.FunctionVersion, Func.BuildSystemVersion, Func.WorkerId);
+ }
+
+ ZEN_CONSOLE("===========================");
+}
+
+} // namespace zen
+
+#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/src/zen/cmds/exec_cmd.h b/src/zen/cmds/exec_cmd.h
new file mode 100644
index 000000000..43d092144
--- /dev/null
+++ b/src/zen/cmds/exec_cmd.h
@@ -0,0 +1,97 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "../zen.h"
+
+#include <zencompute/recordingreader.h>
+#include <zencore/compactbinarypackage.h>
+#include <zencore/guid.h>
+#include <zencore/iohash.h>
+
+#include <filesystem>
+#include <functional>
+#include <unordered_map>
+
+namespace zen {
+class CbPackage;
+class CbObject;
+struct IoHash;
+class ChunkResolver;
+} // namespace zen
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+namespace zen::compute {
+class FunctionServiceSession;
+}
+
+namespace zen {
+
+/**
+ * Zen CLI command for executing functions from a recording
+ *
+ * Mostly for testing and debugging purposes
+ */
+
+class ExecCommand : public ZenCmdBase
+{
+public:
+ ExecCommand();
+ ~ExecCommand();
+
+ static constexpr char Name[] = "exec";
+ static constexpr char Description[] = "Execute functions from a recording";
+
+ virtual void Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override;
+ virtual cxxopts::Options& Options() override { return m_Options; }
+
+private:
+ cxxopts::Options m_Options{Name, Description};
+ std::string m_HostName;
+ std::filesystem::path m_BeaconPath;
+ std::filesystem::path m_RecordingPath;
+ std::filesystem::path m_RecordingLogPath;
+ int m_Offset = 0;
+ int m_Stride = 1;
+ int m_Limit = 0;
+ bool m_Quiet = false;
+ std::string m_Mode{"http"};
+
+ struct FunctionDefinition
+ {
+ std::string FunctionName;
+ zen::Guid FunctionVersion;
+ zen::Guid BuildSystemVersion;
+ zen::IoHash WorkerId;
+ };
+
+ bool m_FunctionListEmittedOnce = false;
+ void EmitFunctionListOnce(const std::vector<FunctionDefinition>& FunctionList);
+ void EmitFunctionList(const std::vector<FunctionDefinition>& FunctionList);
+
+ std::unordered_map<zen::IoHash, zen::CbPackage> m_WorkerMap;
+ std::vector<FunctionDefinition> m_FunctionList;
+ bool m_VerboseLogging = false;
+ bool m_QuietLogging = false;
+
+ zen::ChunkResolver* m_ChunkResolver = nullptr;
+ zen::compute::RecordingReaderBase* m_RecordingReader = nullptr;
+
+ void RegisterWorkerFunctionsFromDescription(const zen::CbObject& WorkerDesc, const zen::IoHash& WorkerId);
+
+ int ExecUsingSession(zen::compute::FunctionServiceSession& FunctionSession);
+
+ // Execution modes
+
+ int DumpWorkItems();
+ int HttpExecute();
+ int InProcessExecute();
+ int LocalMessagingExecute();
+ int BeaconExecute();
+ int BuildActionsLog();
+};
+
+} // namespace zen
+
+#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/src/zen/xmake.lua b/src/zen/xmake.lua
index ab094fef3..f889c3296 100644
--- a/src/zen/xmake.lua
+++ b/src/zen/xmake.lua
@@ -6,15 +6,12 @@ target("zen")
add_files("**.cpp")
add_files("zen.cpp", {unity_ignored = true })
add_deps("zencore", "zenhttp", "zenremotestore", "zenstore", "zenutil")
+ add_deps("zencompute", "zennet")
add_deps("cxxopts", "fmt")
add_packages("json11")
add_includedirs(".")
set_symbols("debug")
- if is_mode("release") then
- set_optimize("fastest")
- end
-
if is_plat("windows") then
add_files("zen.rc")
add_ldflags("/subsystem:console,5.02")
diff --git a/src/zen/zen.cpp b/src/zen/zen.cpp
index 25245c3d2..018f77738 100644
--- a/src/zen/zen.cpp
+++ b/src/zen/zen.cpp
@@ -11,6 +11,7 @@
#include "cmds/cache_cmd.h"
#include "cmds/copy_cmd.h"
#include "cmds/dedup_cmd.h"
+#include "cmds/exec_cmd.h"
#include "cmds/info_cmd.h"
#include "cmds/print_cmd.h"
#include "cmds/projectstore_cmd.h"
@@ -316,22 +317,25 @@ main(int argc, char** argv)
}
#endif // ZEN_WITH_TRACE
- AttachCommand AttachCmd;
- BenchCommand BenchCmd;
- BuildsCommand BuildsCmd;
- CacheDetailsCommand CacheDetailsCmd;
- CacheGetCommand CacheGetCmd;
- CacheGenerateCommand CacheGenerateCmd;
- CacheInfoCommand CacheInfoCmd;
- CacheStatsCommand CacheStatsCmd;
- CopyCommand CopyCmd;
- CopyStateCommand CopyStateCmd;
- CreateOplogCommand CreateOplogCmd;
- CreateProjectCommand CreateProjectCmd;
- DedupCommand DedupCmd;
- DownCommand DownCmd;
- DropCommand DropCmd;
- DropProjectCommand ProjectDropCmd;
+ AttachCommand AttachCmd;
+ BenchCommand BenchCmd;
+ BuildsCommand BuildsCmd;
+ CacheDetailsCommand CacheDetailsCmd;
+ CacheGetCommand CacheGetCmd;
+ CacheGenerateCommand CacheGenerateCmd;
+ CacheInfoCommand CacheInfoCmd;
+ CacheStatsCommand CacheStatsCmd;
+ CopyCommand CopyCmd;
+ CopyStateCommand CopyStateCmd;
+ CreateOplogCommand CreateOplogCmd;
+ CreateProjectCommand CreateProjectCmd;
+ DedupCommand DedupCmd;
+ DownCommand DownCmd;
+ DropCommand DropCmd;
+ DropProjectCommand ProjectDropCmd;
+#if ZEN_WITH_COMPUTE_SERVICES
+ ExecCommand ExecCmd;
+#endif // ZEN_WITH_COMPUTE_SERVICES
ExportOplogCommand ExportOplogCmd;
FlushCommand FlushCmd;
GcCommand GcCmd;
@@ -388,6 +392,9 @@ main(int argc, char** argv)
{"dedup", &DedupCmd, "Dedup files"},
{"down", &DownCmd, "Bring zen server down"},
{"drop", &DropCmd, "Drop cache namespace or bucket"},
+#if ZEN_WITH_COMPUTE_SERVICES
+ {ExecCommand::Name, &ExecCmd, ExecCommand::Description},
+#endif
{"gc-status", &GcStatusCmd, "Garbage collect zen storage status check"},
{"gc-stop", &GcStopCmd, "Request cancel of running garbage collection in zen storage"},
{"gc", &GcCmd, "Garbage collect zen storage"},
diff --git a/src/zencompute-test/xmake.lua b/src/zencompute-test/xmake.lua
new file mode 100644
index 000000000..64a3c7703
--- /dev/null
+++ b/src/zencompute-test/xmake.lua
@@ -0,0 +1,9 @@
+-- Copyright Epic Games, Inc. All Rights Reserved.
+
+target("zencompute-test")
+ set_kind("binary")
+ set_group("tests")
+ add_headerfiles("**.h")
+ add_files("*.cpp")
+ add_deps("zencompute", "zencore")
+ add_packages("vcpkg::doctest")
diff --git a/src/zencompute-test/zencompute-test.cpp b/src/zencompute-test/zencompute-test.cpp
new file mode 100644
index 000000000..237812e12
--- /dev/null
+++ b/src/zencompute-test/zencompute-test.cpp
@@ -0,0 +1,32 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zencompute/zencompute.h>
+#include <zencore/filesystem.h>
+#include <zencore/logging.h>
+#include <zencore/zencore.h>
+
+#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+# include <sys/time.h>
+# include <sys/resource.h>
+# include <zencore/except.h>
+#endif
+
+#if ZEN_WITH_TESTS
+# define ZEN_TEST_WITH_RUNNER 1
+# include <zencore/testing.h>
+#endif
+
+int
+main([[maybe_unused]] int argc, [[maybe_unused]] char* argv[])
+{
+#if ZEN_WITH_TESTS
+ zen::zencompute_forcelinktests();
+
+ zen::logging::InitializeLogging();
+ zen::MaximizeOpenFileCount();
+
+ return ZEN_RUN_TESTS(argc, argv);
+#else
+ return 0;
+#endif
+}
diff --git a/src/zencompute/actionrecorder.cpp b/src/zencompute/actionrecorder.cpp
new file mode 100644
index 000000000..04c4b5141
--- /dev/null
+++ b/src/zencompute/actionrecorder.cpp
@@ -0,0 +1,258 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "actionrecorder.h"
+
+#include "functionrunner.h"
+
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinaryfile.h>
+#include <zencore/compactbinaryvalue.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+
+#if ZEN_PLATFORM_WINDOWS
+# include <ppl.h>
+# define ZEN_CONCRT_AVAILABLE 1
+#else
+# define ZEN_CONCRT_AVAILABLE 0
+#endif
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+namespace zen::compute {
+
+using namespace std::literals;
+
+//////////////////////////////////////////////////////////////////////////
+
+RecordingFileWriter::RecordingFileWriter()
+{
+}
+
+RecordingFileWriter::~RecordingFileWriter()
+{
+ Close();
+}
+
+void
+RecordingFileWriter::Open(std::filesystem::path FilePath)
+{
+ using namespace std::literals;
+
+ m_File.Open(FilePath, BasicFile::Mode::kTruncate);
+ m_File.Write("----DDC2----DATA", 16, 0);
+ m_FileOffset = 16;
+
+ std::filesystem::path TocPath = FilePath.replace_extension(".ztoc");
+ m_TocFile.Open(TocPath, BasicFile::Mode::kTruncate);
+
+ m_TocWriter << "version"sv << 1;
+ m_TocWriter.BeginArray("toc"sv);
+}
+
+void
+RecordingFileWriter::Close()
+{
+ m_TocWriter.EndArray();
+ CbObject Toc = m_TocWriter.Save();
+
+ std::error_code Ec;
+ m_TocFile.WriteAll(Toc.GetBuffer().AsIoBuffer(), Ec);
+}
+
+void
+RecordingFileWriter::AppendObject(const CbObject& Object, const IoHash& ObjectHash)
+{
+ RwLock::ExclusiveLockScope _(m_FileLock);
+
+ MemoryView ObjectView = Object.GetBuffer().GetView();
+
+ std::error_code Ec;
+ m_File.Write(ObjectView, m_FileOffset, Ec);
+
+ if (Ec)
+ {
+ throw std::system_error(Ec, "failed writing to archive");
+ }
+
+ m_TocWriter.BeginArray();
+ m_TocWriter.AddHash(ObjectHash);
+ m_TocWriter.AddInteger(m_FileOffset);
+ m_TocWriter.AddInteger(gsl::narrow<int>(ObjectView.GetSize()));
+ m_TocWriter.EndArray();
+
+ m_FileOffset += ObjectView.GetSize();
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+ActionRecorder::ActionRecorder(ChunkResolver& InChunkResolver, const std::filesystem::path& RecordingLogPath)
+: m_ChunkResolver(InChunkResolver)
+, m_RecordingLogDir(RecordingLogPath)
+{
+ std::error_code Ec;
+ CreateDirectories(m_RecordingLogDir, Ec);
+
+ if (Ec)
+ {
+ ZEN_WARN("Could not create directory '{}': {}", m_RecordingLogDir, Ec.message());
+ }
+
+ CleanDirectory(m_RecordingLogDir, /* ForceRemoveReadOnlyFiles */ true, Ec);
+
+ if (Ec)
+ {
+ ZEN_WARN("Could not clean directory '{}': {}", m_RecordingLogDir, Ec.message());
+ }
+
+ m_WorkersFile.Open(m_RecordingLogDir / "workers.zdat");
+ m_ActionsFile.Open(m_RecordingLogDir / "actions.zdat");
+
+ CidStoreConfiguration CidConfig;
+ CidConfig.RootDirectory = m_RecordingLogDir / "cid";
+ CidConfig.HugeValueThreshold = 128 * 1024 * 1024;
+
+ m_CidStore.Initialize(CidConfig);
+}
+
+ActionRecorder::~ActionRecorder()
+{
+ Shutdown();
+}
+
+void
+ActionRecorder::Shutdown()
+{
+ m_CidStore.Flush();
+}
+
+void
+ActionRecorder::RegisterWorker(const CbPackage& WorkerPackage)
+{
+ const IoHash WorkerId = WorkerPackage.GetObjectHash();
+
+ m_WorkersFile.AppendObject(WorkerPackage.GetObject(), WorkerId);
+
+ std::unordered_set<IoHash> AddedChunks;
+ uint64_t AddedBytes = 0;
+
+ // First add all attachments from the worker package itself
+
+ for (const CbAttachment& Attachment : WorkerPackage.GetAttachments())
+ {
+ CompressedBuffer Buffer = Attachment.AsCompressedBinary();
+ IoBuffer Data = Buffer.GetCompressed().Flatten().AsIoBuffer();
+
+ const IoHash ChunkHash = Buffer.DecodeRawHash();
+
+ CidStore::InsertResult Result = m_CidStore.AddChunk(Data, ChunkHash, CidStore::InsertMode::kCopyOnly);
+
+ AddedChunks.insert(ChunkHash);
+
+ if (Result.New)
+ {
+ AddedBytes += Data.GetSize();
+ }
+ }
+
+ // Not all attachments will be present in the worker package, so we need to add
+ // all referenced chunks to ensure that the recording is self-contained and not
+ // referencing data in the main CID store
+
+ CbObject WorkerDescriptor = WorkerPackage.GetObject();
+
+ WorkerDescriptor.IterateAttachments([&](const CbFieldView AttachmentField) {
+ const IoHash AttachmentCid = AttachmentField.GetValue().AsHash();
+
+ if (!AddedChunks.contains(AttachmentCid))
+ {
+ IoBuffer AttachmentData = m_ChunkResolver.FindChunkByCid(AttachmentCid);
+
+ if (AttachmentData)
+ {
+ CidStore::InsertResult Result = m_CidStore.AddChunk(AttachmentData, AttachmentCid, CidStore::InsertMode::kCopyOnly);
+
+ if (Result.New)
+ {
+ AddedBytes += AttachmentData.GetSize();
+ }
+ }
+ else
+ {
+ ZEN_WARN("RegisterWorker: could not resolve attachment chunk {} for worker {}", AttachmentCid, WorkerId);
+ }
+
+ AddedChunks.insert(AttachmentCid);
+ }
+ });
+
+ ZEN_INFO("recorded worker {} with {} attachments ({} bytes)", WorkerId, AddedChunks.size(), AddedBytes);
+}
+
+bool
+ActionRecorder::RecordAction(Ref<RunnerAction> Action)
+{
+ bool AllGood = true;
+
+ Action->ActionObj.IterateAttachments([&](CbFieldView Field) {
+ IoHash AttachData = Field.AsHash();
+ IoBuffer ChunkData = m_ChunkResolver.FindChunkByCid(AttachData);
+
+ if (ChunkData)
+ {
+ if (ChunkData.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ IoHash DecompressedHash;
+ uint64_t RawSize = 0;
+ CompressedBuffer Compressed =
+ CompressedBuffer::FromCompressed(SharedBuffer(ChunkData), /* out */ DecompressedHash, /* out*/ RawSize);
+
+ OodleCompressor Compressor;
+ OodleCompressionLevel CompressionLevel;
+ uint64_t BlockSize = 0;
+ if (Compressed.TryGetCompressParameters(/* out */ Compressor, /* out */ CompressionLevel, /* out */ BlockSize))
+ {
+ if (Compressor == OodleCompressor::NotSet)
+ {
+ CompositeBuffer Decompressed = Compressed.DecompressToComposite();
+ CompressedBuffer NewCompressed = CompressedBuffer::Compress(std::move(Decompressed),
+ OodleCompressor::Mermaid,
+ OodleCompressionLevel::Fast,
+ BlockSize);
+
+ ChunkData = NewCompressed.GetCompressed().Flatten().AsIoBuffer();
+ }
+ }
+ }
+
+ const uint64_t ChunkSize = ChunkData.GetSize();
+
+ m_CidStore.AddChunk(ChunkData, AttachData, CidStore::InsertMode::kCopyOnly);
+ ++m_ChunkCounter;
+ m_ChunkBytesCounter.fetch_add(ChunkSize);
+ }
+ else
+ {
+ AllGood = false;
+
+ ZEN_WARN("could not resolve chunk {}", AttachData);
+ }
+ });
+
+ if (AllGood)
+ {
+ m_ActionsFile.AppendObject(Action->ActionObj, Action->ActionId);
+ ++m_ActionsCounter;
+
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+}
+
+} // namespace zen::compute
+
+#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/src/zencompute/actionrecorder.h b/src/zencompute/actionrecorder.h
new file mode 100644
index 000000000..9cc2b44a2
--- /dev/null
+++ b/src/zencompute/actionrecorder.h
@@ -0,0 +1,91 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencompute/functionservice.h>
+#include <zencompute/zencompute.h>
+#include <zencore/basicfile.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zenstore/cidstore.h>
+#include <zenstore/gc.h>
+#include <zenstore/zenstore.h>
+
+#include <filesystem>
+#include <functional>
+#include <map>
+#include <unordered_map>
+
+namespace zen {
+class CbObject;
+class CbPackage;
+struct IoHash;
+} // namespace zen
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+namespace zen::compute {
+
+//////////////////////////////////////////////////////////////////////////
+
+struct RecordingFileWriter
+{
+ RecordingFileWriter(RecordingFileWriter&&) = delete;
+ RecordingFileWriter& operator=(RecordingFileWriter&&) = delete;
+
+ RwLock m_FileLock;
+ BasicFile m_File;
+ uint64_t m_FileOffset = 0;
+ CbObjectWriter m_TocWriter;
+ BasicFile m_TocFile;
+
+ RecordingFileWriter();
+ ~RecordingFileWriter();
+
+ void Open(std::filesystem::path FilePath);
+ void Close();
+ void AppendObject(const CbObject& Object, const IoHash& ObjectHash);
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+/**
+ * Recording "runner" implementation
+ *
+ * This class writes out all actions and their attachments to a recording directory
+ * in a format that can be read back by the RecordingReader.
+ *
+ * The contents of the recording directory will be self-contained, with all referenced
+ * attachments stored in the recording directory itself, so that the recording can be
+ * moved or shared without needing to maintain references to the main CID store.
+ *
+ */
+
+class ActionRecorder
+{
+public:
+ ActionRecorder(ChunkResolver& InChunkResolver, const std::filesystem::path& RecordingLogPath);
+ ~ActionRecorder();
+
+ ActionRecorder(const ActionRecorder&) = delete;
+ ActionRecorder& operator=(const ActionRecorder&) = delete;
+
+ void Shutdown();
+ void RegisterWorker(const CbPackage& WorkerPackage);
+ bool RecordAction(Ref<RunnerAction> Action);
+
+private:
+ ChunkResolver& m_ChunkResolver;
+ std::filesystem::path m_RecordingLogDir;
+
+ RecordingFileWriter m_WorkersFile;
+ RecordingFileWriter m_ActionsFile;
+ GcManager m_Gc;
+ CidStore m_CidStore{m_Gc};
+ std::atomic<int> m_ChunkCounter{0};
+ std::atomic<uint64_t> m_ChunkBytesCounter{0};
+ std::atomic<int> m_ActionsCounter{0};
+};
+
+} // namespace zen::compute
+
+#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/src/zencompute/functionrunner.cpp b/src/zencompute/functionrunner.cpp
new file mode 100644
index 000000000..8e7c12b2b
--- /dev/null
+++ b/src/zencompute/functionrunner.cpp
@@ -0,0 +1,112 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "functionrunner.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include <zencore/compactbinary.h>
+# include <zencore/filesystem.h>
+
+# include <fmt/format.h>
+# include <vector>
+
+namespace zen::compute {
+
+FunctionRunner::FunctionRunner(std::filesystem::path BasePath) : m_ActionsPath(BasePath / "actions")
+{
+}
+
+FunctionRunner::~FunctionRunner() = default;
+
+size_t
+FunctionRunner::QueryCapacity()
+{
+ return 1;
+}
+
+std::vector<SubmitResult>
+FunctionRunner::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
+{
+ std::vector<SubmitResult> Results;
+ Results.reserve(Actions.size());
+
+ for (const Ref<RunnerAction>& Action : Actions)
+ {
+ Results.push_back(SubmitAction(Action));
+ }
+
+ return Results;
+}
+
+void
+FunctionRunner::MaybeDumpAction(int ActionLsn, const CbObject& ActionObject)
+{
+ if (m_DumpActions)
+ {
+ std::string UniqueId = fmt::format("{}.ddb", ActionLsn);
+ std::filesystem::path Path = m_ActionsPath / UniqueId;
+
+ zen::WriteFile(Path, IoBuffer(ActionObject.GetBuffer().AsIoBuffer()));
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+RunnerAction::RunnerAction(FunctionServiceSession* OwnerSession) : m_OwnerSession(OwnerSession)
+{
+ this->Timestamps[static_cast<int>(State::New)] = DateTime::Now().GetTicks();
+}
+
+RunnerAction::~RunnerAction()
+{
+}
+
+void
+RunnerAction::SetActionState(State NewState)
+{
+ ZEN_ASSERT(NewState < State::_Count);
+ this->Timestamps[static_cast<int>(NewState)] = DateTime::Now().GetTicks();
+
+ do
+ {
+ if (State CurrentState = m_ActionState.load(); CurrentState == NewState)
+ {
+ // No state change
+ return;
+ }
+ else
+ {
+ if (NewState <= CurrentState)
+ {
+ // Cannot transition to an earlier or same state
+ return;
+ }
+
+ if (m_ActionState.compare_exchange_strong(CurrentState, NewState))
+ {
+ // Successful state change
+
+ m_OwnerSession->PostUpdate(this);
+
+ return;
+ }
+ }
+ } while (true);
+}
+
+void
+RunnerAction::SetResult(CbPackage&& Result)
+{
+ m_Result = std::move(Result);
+}
+
+CbPackage&
+RunnerAction::GetResult()
+{
+ ZEN_ASSERT(IsCompleted());
+ return m_Result;
+}
+
+} // namespace zen::compute
+
+#endif // ZEN_WITH_COMPUTE_SERVICES \ No newline at end of file
diff --git a/src/zencompute/functionrunner.h b/src/zencompute/functionrunner.h
new file mode 100644
index 000000000..6fd0d84cc
--- /dev/null
+++ b/src/zencompute/functionrunner.h
@@ -0,0 +1,207 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencompute/functionservice.h>
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include <filesystem>
+# include <vector>
+
+namespace zen::compute {
+
+struct SubmitResult
+{
+ bool IsAccepted = false;
+ std::string Reason;
+};
+
+/** Base interface for classes implementing a remote execution "runner"
+ */
+class FunctionRunner : public RefCounted
+{
+ FunctionRunner(FunctionRunner&&) = delete;
+ FunctionRunner& operator=(FunctionRunner&&) = delete;
+
+public:
+ FunctionRunner(std::filesystem::path BasePath);
+ virtual ~FunctionRunner() = 0;
+
+ virtual void Shutdown() = 0;
+ virtual void RegisterWorker(const CbPackage& WorkerPackage) = 0;
+
+ [[nodiscard]] virtual SubmitResult SubmitAction(Ref<RunnerAction> Action) = 0;
+ [[nodiscard]] virtual size_t GetSubmittedActionCount() = 0;
+ [[nodiscard]] virtual bool IsHealthy() = 0;
+ [[nodiscard]] virtual size_t QueryCapacity();
+ [[nodiscard]] virtual std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions);
+
+protected:
+ std::filesystem::path m_ActionsPath;
+ bool m_DumpActions = false;
+ void MaybeDumpAction(int ActionLsn, const CbObject& ActionObject);
+};
+
+template<typename RunnerType>
+struct RunnerGroup
+{
+ void AddRunner(RunnerType* Runner)
+ {
+ m_RunnersLock.WithExclusiveLock([&] { m_Runners.emplace_back(Runner); });
+ }
+ size_t QueryCapacity()
+ {
+ size_t TotalCapacity = 0;
+ m_RunnersLock.WithSharedLock([&] {
+ for (const auto& Runner : m_Runners)
+ {
+ TotalCapacity += Runner->QueryCapacity();
+ }
+ });
+ return TotalCapacity;
+ }
+
+ SubmitResult SubmitAction(Ref<RunnerAction> Action)
+ {
+ RwLock::SharedLockScope _(m_RunnersLock);
+
+ const int InitialIndex = m_NextSubmitIndex.load(std::memory_order_acquire);
+ int Index = InitialIndex;
+ const int RunnerCount = gsl::narrow<int>(m_Runners.size());
+
+ if (RunnerCount == 0)
+ {
+ return {.IsAccepted = false, .Reason = "No runners available"};
+ }
+
+ do
+ {
+ while (Index >= RunnerCount)
+ {
+ Index -= RunnerCount;
+ }
+
+ auto& Runner = m_Runners[Index++];
+
+ SubmitResult Result = Runner->SubmitAction(Action);
+
+ if (Result.IsAccepted == true)
+ {
+ m_NextSubmitIndex = Index % RunnerCount;
+
+ return Result;
+ }
+
+ while (Index >= RunnerCount)
+ {
+ Index -= RunnerCount;
+ }
+ } while (Index != InitialIndex);
+
+ return {.IsAccepted = false};
+ }
+
+ size_t GetSubmittedActionCount()
+ {
+ RwLock::SharedLockScope _(m_RunnersLock);
+
+ size_t TotalCount = 0;
+
+ for (const auto& Runner : m_Runners)
+ {
+ TotalCount += Runner->GetSubmittedActionCount();
+ }
+
+ return TotalCount;
+ }
+
+ void RegisterWorker(CbPackage Worker)
+ {
+ RwLock::SharedLockScope _(m_RunnersLock);
+
+ for (auto& Runner : m_Runners)
+ {
+ Runner->RegisterWorker(Worker);
+ }
+ }
+
+ void Shutdown()
+ {
+ RwLock::SharedLockScope _(m_RunnersLock);
+
+ for (auto& Runner : m_Runners)
+ {
+ Runner->Shutdown();
+ }
+ }
+
+private:
+ RwLock m_RunnersLock;
+ std::vector<Ref<RunnerType>> m_Runners;
+ std::atomic<int> m_NextSubmitIndex{0};
+};
+
+/**
+ * This represents an action going through different stages of scheduling and execution.
+ */
+struct RunnerAction : public RefCounted
+{
+ explicit RunnerAction(FunctionServiceSession* OwnerSession);
+ ~RunnerAction();
+
+ int ActionLsn = 0;
+ WorkerDesc Worker;
+ IoHash ActionId;
+ CbObject ActionObj;
+ int Priority = 0;
+
+ enum class State
+ {
+ New,
+ Pending,
+ Running,
+ Completed,
+ Failed,
+ _Count
+ };
+
+ static const char* ToString(State _)
+ {
+ switch (_)
+ {
+ case State::New:
+ return "New";
+ case State::Pending:
+ return "Pending";
+ case State::Running:
+ return "Running";
+ case State::Completed:
+ return "Completed";
+ case State::Failed:
+ return "Failed";
+ default:
+ return "Unknown";
+ }
+ }
+
+ uint64_t Timestamps[static_cast<int>(State::_Count)] = {};
+
+ State ActionState() const { return m_ActionState; }
+ void SetActionState(State NewState);
+
+ bool IsSuccess() const { return ActionState() == State::Completed; }
+ bool IsCompleted() const { return ActionState() == State::Completed || ActionState() == State::Failed; }
+
+ void SetResult(CbPackage&& Result);
+ CbPackage& GetResult();
+
+private:
+ std::atomic<State> m_ActionState = State::New;
+ FunctionServiceSession* m_OwnerSession = nullptr;
+ CbPackage m_Result;
+};
+
+} // namespace zen::compute
+
+#endif // ZEN_WITH_COMPUTE_SERVICES \ No newline at end of file
diff --git a/src/zencompute/functionservice.cpp b/src/zencompute/functionservice.cpp
new file mode 100644
index 000000000..0698449e9
--- /dev/null
+++ b/src/zencompute/functionservice.cpp
@@ -0,0 +1,957 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "zencompute/functionservice.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include "functionrunner.h"
+# include "actionrecorder.h"
+# include "localrunner.h"
+# include "remotehttprunner.h"
+
+# include <zencompute/recordingreader.h>
+# include <zencore/compactbinary.h>
+# include <zencore/compactbinarybuilder.h>
+# include <zencore/compactbinarypackage.h>
+# include <zencore/compress.h>
+# include <zencore/except.h>
+# include <zencore/filesystem.h>
+# include <zencore/fmtutils.h>
+# include <zencore/iobuffer.h>
+# include <zencore/iohash.h>
+# include <zencore/logging.h>
+# include <zencore/scopeguard.h>
+# include <zentelemetry/stats.h>
+
+# include <set>
+# include <deque>
+# include <map>
+# include <thread>
+# include <unordered_map>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+# include <EASTL/hash_set.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+using namespace std::literals;
+
+namespace zen::compute {
+
+//////////////////////////////////////////////////////////////////////////
+
+struct FunctionServiceSession::Impl
+{
+ FunctionServiceSession* m_FunctionServiceSession;
+ ChunkResolver& m_ChunkResolver;
+ LoggerRef m_Log{logging::Get("apply")};
+
+ Impl(FunctionServiceSession* InFunctionServiceSession, ChunkResolver& InChunkResolver)
+ : m_FunctionServiceSession(InFunctionServiceSession)
+ , m_ChunkResolver(InChunkResolver)
+ {
+ m_SchedulingThread = std::thread{&Impl::MonitorThreadFunction, this};
+ }
+
+ void Shutdown();
+ bool IsHealthy();
+
+ LoggerRef Log() { return m_Log; }
+
+ std::atomic_bool m_AcceptActions = true;
+
+ struct FunctionDefinition
+ {
+ std::string FunctionName;
+ Guid FunctionVersion;
+ Guid BuildSystemVersion;
+ IoHash WorkerId;
+ };
+
+ void EmitStats(CbObjectWriter& Cbo)
+ {
+ m_WorkerLock.WithSharedLock([&] { Cbo << "worker_count"sv << m_WorkerMap.size(); });
+ m_ResultsLock.WithSharedLock([&] { Cbo << "actions_complete"sv << m_ResultsMap.size(); });
+ m_PendingLock.WithSharedLock([&] { Cbo << "actions_pending"sv << m_PendingActions.size(); });
+ Cbo << "actions_submitted"sv << GetSubmittedActionCount();
+ EmitSnapshot("actions_retired"sv, m_ResultRate, Cbo);
+ }
+
+ void RegisterWorker(CbPackage Worker);
+ WorkerDesc GetWorkerDescriptor(const IoHash& WorkerId);
+
+ std::atomic<int32_t> m_ActionsCounter = 0; // sequence number
+
+ RwLock m_PendingLock;
+ std::map<int, Ref<RunnerAction>> m_PendingActions;
+
+ RwLock m_RunningLock;
+ std::unordered_map<int, Ref<RunnerAction>> m_RunningMap;
+
+ RwLock m_ResultsLock;
+ std::unordered_map<int, Ref<RunnerAction>> m_ResultsMap;
+ metrics::Meter m_ResultRate;
+ std::atomic<uint64_t> m_RetiredCount{0};
+
+ HttpResponseCode GetActionResult(int ActionLsn, CbPackage& OutResultPackage);
+ HttpResponseCode FindActionResult(const IoHash& ActionId, CbPackage& ResultPackage);
+
+ std::atomic<bool> m_ShutdownRequested{false};
+
+ std::thread m_SchedulingThread;
+ std::atomic<bool> m_SchedulingThreadEnabled{true};
+ Event m_SchedulingThreadEvent;
+
+ void MonitorThreadFunction();
+ void SchedulePendingActions();
+
+ // Workers
+
+ RwLock m_WorkerLock;
+ std::unordered_map<IoHash, CbPackage> m_WorkerMap;
+ std::vector<FunctionDefinition> m_FunctionList;
+ std::vector<IoHash> GetKnownWorkerIds();
+
+ // Runners
+
+ RunnerGroup<LocalProcessRunner> m_LocalRunnerGroup;
+ RunnerGroup<RemoteHttpRunner> m_RemoteRunnerGroup;
+
+ EnqueueResult EnqueueAction(CbObject ActionObject, int Priority);
+ EnqueueResult EnqueueResolvedAction(WorkerDesc Worker, CbObject ActionObj, int RequestPriority);
+
+ void GetCompleted(CbWriter& Cbo);
+
+ // Recording
+
+ void StartRecording(ChunkResolver& InCidStore, const std::filesystem::path& RecordingPath);
+ void StopRecording();
+
+ std::unique_ptr<ActionRecorder> m_Recorder;
+
+ // History tracking
+
+ RwLock m_ActionHistoryLock;
+ std::deque<FunctionServiceSession::ActionHistoryEntry> m_ActionHistory;
+ size_t m_HistoryLimit = 1000;
+
+ std::vector<FunctionServiceSession::ActionHistoryEntry> GetActionHistory(int Limit);
+
+ //
+
+ [[nodiscard]] size_t QueryCapacity();
+
+ [[nodiscard]] SubmitResult SubmitAction(Ref<RunnerAction> Action);
+ [[nodiscard]] std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions);
+ [[nodiscard]] size_t GetSubmittedActionCount();
+
+ // Updates
+
+ RwLock m_UpdatedActionsLock;
+ std::vector<Ref<RunnerAction>> m_UpdatedActions;
+
+ void HandleActionUpdates();
+ void PostUpdate(RunnerAction* Action);
+
+ void ShutdownRunners();
+};
+
+bool
+FunctionServiceSession::Impl::IsHealthy()
+{
+ return true;
+}
+
+void
+FunctionServiceSession::Impl::Shutdown()
+{
+ m_AcceptActions = false;
+ m_ShutdownRequested = true;
+
+ m_SchedulingThreadEnabled = false;
+ m_SchedulingThreadEvent.Set();
+ if (m_SchedulingThread.joinable())
+ {
+ m_SchedulingThread.join();
+ }
+
+ ShutdownRunners();
+}
+
+void
+FunctionServiceSession::Impl::ShutdownRunners()
+{
+ m_LocalRunnerGroup.Shutdown();
+ m_RemoteRunnerGroup.Shutdown();
+}
+
+void
+FunctionServiceSession::Impl::StartRecording(ChunkResolver& InCidStore, const std::filesystem::path& RecordingPath)
+{
+ ZEN_INFO("starting recording to '{}'", RecordingPath);
+
+ m_Recorder = std::make_unique<ActionRecorder>(InCidStore, RecordingPath);
+
+ ZEN_INFO("started recording to '{}'", RecordingPath);
+}
+
+void
+FunctionServiceSession::Impl::StopRecording()
+{
+ ZEN_INFO("stopping recording");
+
+ m_Recorder = nullptr;
+
+ ZEN_INFO("stopped recording");
+}
+
+std::vector<FunctionServiceSession::ActionHistoryEntry>
+FunctionServiceSession::Impl::GetActionHistory(int Limit)
+{
+ RwLock::SharedLockScope _(m_ActionHistoryLock);
+
+ if (Limit > 0 && static_cast<size_t>(Limit) < m_ActionHistory.size())
+ {
+ return std::vector<ActionHistoryEntry>(m_ActionHistory.end() - Limit, m_ActionHistory.end());
+ }
+
+ return std::vector<ActionHistoryEntry>(m_ActionHistory.begin(), m_ActionHistory.end());
+}
+
+void
+FunctionServiceSession::Impl::RegisterWorker(CbPackage Worker)
+{
+ RwLock::ExclusiveLockScope _(m_WorkerLock);
+
+ const IoHash& WorkerId = Worker.GetObject().GetHash();
+
+ if (m_WorkerMap.insert_or_assign(WorkerId, Worker).second)
+ {
+ // Note that since the convention currently is that WorkerId is equal to the hash
+ // of the worker descriptor there is no chance that we get a second write with a
+ // different descriptor. Thus we only need to call this the first time, when the
+ // worker is added
+
+ m_LocalRunnerGroup.RegisterWorker(Worker);
+ m_RemoteRunnerGroup.RegisterWorker(Worker);
+
+ if (m_Recorder)
+ {
+ m_Recorder->RegisterWorker(Worker);
+ }
+
+ CbObject WorkerObj = Worker.GetObject();
+
+ // Populate worker database
+
+ const Guid WorkerBuildSystemVersion = WorkerObj["buildsystem_version"sv].AsUuid();
+
+ for (auto& Item : WorkerObj["functions"sv])
+ {
+ CbObjectView Function = Item.AsObjectView();
+
+ std::string_view FunctionName = Function["name"sv].AsString();
+ const Guid FunctionVersion = Function["version"sv].AsUuid();
+
+ m_FunctionList.emplace_back(FunctionDefinition{.FunctionName = std::string{FunctionName},
+ .FunctionVersion = FunctionVersion,
+ .BuildSystemVersion = WorkerBuildSystemVersion,
+ .WorkerId = WorkerId});
+ }
+ }
+}
+
+WorkerDesc
+FunctionServiceSession::Impl::GetWorkerDescriptor(const IoHash& WorkerId)
+{
+ RwLock::SharedLockScope _(m_WorkerLock);
+
+ if (auto It = m_WorkerMap.find(WorkerId); It != m_WorkerMap.end())
+ {
+ const CbPackage& Desc = It->second;
+ return {Desc, WorkerId};
+ }
+
+ return {};
+}
+
+std::vector<IoHash>
+FunctionServiceSession::Impl::GetKnownWorkerIds()
+{
+ std::vector<IoHash> WorkerIds;
+ WorkerIds.reserve(m_WorkerMap.size());
+
+ m_WorkerLock.WithSharedLock([&] {
+ for (const auto& [WorkerId, _] : m_WorkerMap)
+ {
+ WorkerIds.push_back(WorkerId);
+ }
+ });
+
+ return WorkerIds;
+}
+
+FunctionServiceSession::EnqueueResult
+FunctionServiceSession::Impl::EnqueueAction(CbObject ActionObject, int Priority)
+{
+ // Resolve function to worker
+
+ IoHash WorkerId{IoHash::Zero};
+
+ std::string_view FunctionName = ActionObject["Function"sv].AsString();
+ const Guid FunctionVersion = ActionObject["FunctionVersion"sv].AsUuid();
+ const Guid BuildSystemVersion = ActionObject["BuildSystemVersion"sv].AsUuid();
+
+ for (const FunctionDefinition& FuncDef : m_FunctionList)
+ {
+ if (FuncDef.FunctionName == FunctionName && FuncDef.FunctionVersion == FunctionVersion &&
+ FuncDef.BuildSystemVersion == BuildSystemVersion)
+ {
+ WorkerId = FuncDef.WorkerId;
+
+ break;
+ }
+ }
+
+ if (WorkerId == IoHash::Zero)
+ {
+ CbObjectWriter Writer;
+
+ Writer << "Function"sv << FunctionName << "FunctionVersion"sv << FunctionVersion << "BuildSystemVersion" << BuildSystemVersion;
+ Writer << "error"
+ << "no worker matches the action specification";
+
+ return {0, Writer.Save()};
+ }
+
+ if (auto It = m_WorkerMap.find(WorkerId); It != m_WorkerMap.end())
+ {
+ CbPackage WorkerPackage = It->second;
+
+ return EnqueueResolvedAction(WorkerDesc{WorkerPackage, WorkerId}, ActionObject, Priority);
+ }
+
+ CbObjectWriter Writer;
+
+ Writer << "Function"sv << FunctionName << "FunctionVersion"sv << FunctionVersion << "BuildSystemVersion" << BuildSystemVersion;
+ Writer << "error"
+ << "no worker found despite match";
+
+ return {0, Writer.Save()};
+}
+
+FunctionServiceSession::EnqueueResult
+FunctionServiceSession::Impl::EnqueueResolvedAction(WorkerDesc Worker, CbObject ActionObj, int RequestPriority)
+{
+ const int ActionLsn = ++m_ActionsCounter;
+
+ Ref<RunnerAction> Pending{new RunnerAction(m_FunctionServiceSession)};
+
+ Pending->ActionLsn = ActionLsn;
+ Pending->Worker = Worker;
+ Pending->ActionId = ActionObj.GetHash();
+ Pending->ActionObj = ActionObj;
+ Pending->Priority = RequestPriority;
+
+ SubmitResult SubResult = SubmitAction(Pending);
+
+ if (SubResult.IsAccepted)
+ {
+ // Great, the job is being taken care of by the runner
+ ZEN_DEBUG("direct schedule LSN {}", Pending->ActionLsn);
+ }
+ else
+ {
+ ZEN_DEBUG("action {} ({}) PENDING", Pending->ActionId, Pending->ActionLsn);
+
+ Pending->SetActionState(RunnerAction::State::Pending);
+ }
+
+ if (m_Recorder)
+ {
+ m_Recorder->RecordAction(Pending);
+ }
+
+ CbObjectWriter Writer;
+ Writer << "lsn" << Pending->ActionLsn;
+ Writer << "worker" << Pending->Worker.WorkerId;
+ Writer << "action" << Pending->ActionId;
+
+ return {Pending->ActionLsn, Writer.Save()};
+}
+
+SubmitResult
+FunctionServiceSession::Impl::SubmitAction(Ref<RunnerAction> Action)
+{
+ // Loosely round-robin scheduling of actions across runners.
+ //
+ // It's not entirely clear what this means given that submits
+ // can come in across multiple threads, but it's probably better
+ // than always starting with the first runner.
+ //
+ // Longer term we should track the state of the individual
+ // runners and make decisions accordingly.
+
+ SubmitResult Result = m_LocalRunnerGroup.SubmitAction(Action);
+ if (Result.IsAccepted)
+ {
+ return Result;
+ }
+
+ return m_RemoteRunnerGroup.SubmitAction(Action);
+}
+
+size_t
+FunctionServiceSession::Impl::GetSubmittedActionCount()
+{
+ return m_LocalRunnerGroup.GetSubmittedActionCount() + m_RemoteRunnerGroup.GetSubmittedActionCount();
+}
+
+HttpResponseCode
+FunctionServiceSession::Impl::GetActionResult(int ActionLsn, CbPackage& OutResultPackage)
+{
+ // This lock is held for the duration of the function since we need to
+ // be sure that the action doesn't change state while we are checking the
+ // different data structures
+
+ RwLock::ExclusiveLockScope _(m_ResultsLock);
+
+ if (auto It = m_ResultsMap.find(ActionLsn); It != m_ResultsMap.end())
+ {
+ OutResultPackage = std::move(It->second->GetResult());
+
+ m_ResultsMap.erase(It);
+
+ return HttpResponseCode::OK;
+ }
+
+ {
+ RwLock::SharedLockScope __(m_PendingLock);
+
+ if (auto FindIt = m_PendingActions.find(ActionLsn); FindIt != m_PendingActions.end())
+ {
+ return HttpResponseCode::Accepted;
+ }
+ }
+
+ // Lock order is important here to avoid deadlocks, RwLock m_RunningLock must
+ // always be taken after m_ResultsLock if both are needed
+
+ {
+ RwLock::SharedLockScope __(m_RunningLock);
+
+ if (m_RunningMap.find(ActionLsn) != m_RunningMap.end())
+ {
+ return HttpResponseCode::Accepted;
+ }
+ }
+
+ return HttpResponseCode::NotFound;
+}
+
+HttpResponseCode
+FunctionServiceSession::Impl::FindActionResult(const IoHash& ActionId, CbPackage& OutResultPackage)
+{
+ // This lock is held for the duration of the function since we need to
+ // be sure that the action doesn't change state while we are checking the
+ // different data structures
+
+ RwLock::ExclusiveLockScope _(m_ResultsLock);
+
+ for (auto It = begin(m_ResultsMap), End = end(m_ResultsMap); It != End; ++It)
+ {
+ if (It->second->ActionId == ActionId)
+ {
+ OutResultPackage = std::move(It->second->GetResult());
+
+ m_ResultsMap.erase(It);
+
+ return HttpResponseCode::OK;
+ }
+ }
+
+ {
+ RwLock::SharedLockScope __(m_PendingLock);
+
+ for (const auto& [K, Pending] : m_PendingActions)
+ {
+ if (Pending->ActionId == ActionId)
+ {
+ return HttpResponseCode::Accepted;
+ }
+ }
+ }
+
+ // Lock order is important here to avoid deadlocks, RwLock m_RunningLock must
+ // always be taken after m_ResultsLock if both are needed
+
+ {
+ RwLock::SharedLockScope __(m_RunningLock);
+
+ for (const auto& [K, v] : m_RunningMap)
+ {
+ if (v->ActionId == ActionId)
+ {
+ return HttpResponseCode::Accepted;
+ }
+ }
+ }
+
+ return HttpResponseCode::NotFound;
+}
+
+void
+FunctionServiceSession::Impl::GetCompleted(CbWriter& Cbo)
+{
+ Cbo.BeginArray("completed");
+
+ m_ResultsLock.WithSharedLock([&] {
+ for (auto& Kv : m_ResultsMap)
+ {
+ Cbo << Kv.first;
+ }
+ });
+
+ Cbo.EndArray();
+}
+
+# define ZEN_BATCH_SCHEDULER 1
+
+void
+FunctionServiceSession::Impl::SchedulePendingActions()
+{
+ int ScheduledCount = 0;
+ size_t RunningCount = m_RunningLock.WithSharedLock([&] { return m_RunningMap.size(); });
+ size_t PendingCount = m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); });
+ size_t ResultCount = m_ResultsLock.WithSharedLock([&] { return m_ResultsMap.size(); });
+
+ static Stopwatch DumpRunningTimer;
+
+ auto _ = MakeGuard([&] {
+ ZEN_INFO("scheduled {} pending actions. {} running ({} retired), {} still pending, {} results",
+ ScheduledCount,
+ RunningCount,
+ m_RetiredCount.load(),
+ PendingCount,
+ ResultCount);
+
+ if (DumpRunningTimer.GetElapsedTimeMs() > 30000)
+ {
+ DumpRunningTimer.Reset();
+
+ std::set<int> RunningList;
+ m_RunningLock.WithSharedLock([&] {
+ for (auto& [K, V] : m_RunningMap)
+ {
+ RunningList.insert(K);
+ }
+ });
+
+ ExtendableStringBuilder<1024> RunningString;
+ for (int i : RunningList)
+ {
+ if (RunningString.Size())
+ {
+ RunningString << ", ";
+ }
+
+ RunningString.Append(IntNum(i));
+ }
+
+ ZEN_INFO("running: {}", RunningString);
+ }
+ });
+
+# if ZEN_BATCH_SCHEDULER
+ size_t Capacity = QueryCapacity();
+
+ if (!Capacity)
+ {
+ _.Dismiss();
+
+ return;
+ }
+
+ std::vector<Ref<RunnerAction>> ActionsToSchedule;
+
+ // Pull actions to schedule from the pending queue, we will try to submit these to the runner outside of the lock
+
+ m_PendingLock.WithExclusiveLock([&] {
+ if (m_ShutdownRequested)
+ {
+ return;
+ }
+
+ if (m_PendingActions.empty())
+ {
+ return;
+ }
+
+ size_t NumActionsToSchedule = std::min(Capacity, m_PendingActions.size());
+
+ auto PendingIt = m_PendingActions.begin();
+ const auto PendingEnd = m_PendingActions.end();
+
+ while (NumActionsToSchedule && PendingIt != PendingEnd)
+ {
+ const Ref<RunnerAction>& Pending = PendingIt->second;
+
+ switch (Pending->ActionState())
+ {
+ case RunnerAction::State::Pending:
+ ActionsToSchedule.push_back(Pending);
+ break;
+
+ case RunnerAction::State::Running:
+ case RunnerAction::State::Completed:
+ case RunnerAction::State::Failed:
+ break;
+
+ default:
+ case RunnerAction::State::New:
+ ZEN_WARN("unexpected state {} for pending action {}", static_cast<int>(Pending->ActionState()), Pending->ActionLsn);
+ break;
+ }
+
+ ++PendingIt;
+ --NumActionsToSchedule;
+ }
+
+ PendingCount = m_PendingActions.size();
+ });
+
+ if (ActionsToSchedule.empty())
+ {
+ _.Dismiss();
+ return;
+ }
+
+ ZEN_INFO("attempting schedule of {} pending actions", ActionsToSchedule.size());
+
+ auto SubmitResults = SubmitActions(ActionsToSchedule);
+
+ // Move successfully scheduled actions to the running map and remove
+ // from pending queue. It's actually possible that by the time we get
+ // to this stage some of the actions may have already completed, so
+ // they should not always be added to the running map
+
+ eastl::hash_set<int> ScheduledActions;
+
+ for (size_t i = 0; i < ActionsToSchedule.size(); ++i)
+ {
+ const Ref<RunnerAction>& Pending = ActionsToSchedule[i];
+ const SubmitResult& SubResult = SubmitResults[i];
+
+ if (SubResult.IsAccepted)
+ {
+ ScheduledActions.insert(Pending->ActionLsn);
+ }
+ }
+
+ ScheduledCount += (int)ActionsToSchedule.size();
+
+# else
+ m_PendingLock.WithExclusiveLock([&] {
+ while (!m_PendingActions.empty())
+ {
+ if (m_ShutdownRequested)
+ {
+ return;
+ }
+
+ // Here it would be good if we could decide to pop immediately to avoid
+ // holding the lock while creating processes etc
+ const Ref<RunnerAction>& Pending = m_PendingActions.begin()->second;
+ FunctionRunner::SubmitResult SubResult = SubmitAction(Pending);
+
+ if (SubResult.IsAccepted)
+ {
+ // Great, the job is being taken care of by the runner
+
+ ZEN_DEBUG("action {} ({}) PENDING -> RUNNING", Pending->ActionId, Pending->ActionLsn);
+
+ m_RunningLock.WithExclusiveLock([&] {
+ m_RunningMap.insert({Pending->ActionLsn, Pending});
+
+ RunningCount = m_RunningMap.size();
+ });
+
+ m_PendingActions.pop_front();
+
+ PendingCount = m_PendingActions.size();
+ ++ScheduledCount;
+ }
+ else
+ {
+ // Runner could not accept the job, leave it on the pending queue
+
+ return;
+ }
+ }
+ });
+# endif
+}
+
+void
+FunctionServiceSession::Impl::MonitorThreadFunction()
+{
+ SetCurrentThreadName("FunctionServiceSession_Monitor");
+
+ auto _ = MakeGuard([&] { ZEN_INFO("monitor thread exiting"); });
+
+ do
+ {
+ int TimeoutMs = 1000;
+
+ if (m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); }))
+ {
+ TimeoutMs = 100;
+ }
+
+ const bool Timedout = m_SchedulingThreadEvent.Wait(TimeoutMs);
+
+ if (m_SchedulingThreadEnabled == false)
+ {
+ return;
+ }
+
+ HandleActionUpdates();
+
+ // Schedule pending actions
+
+ SchedulePendingActions();
+
+ if (!Timedout)
+ {
+ m_SchedulingThreadEvent.Reset();
+ }
+ } while (m_SchedulingThreadEnabled);
+}
+
+void
+FunctionServiceSession::Impl::PostUpdate(RunnerAction* Action)
+{
+ m_UpdatedActionsLock.WithExclusiveLock([&] { m_UpdatedActions.emplace_back(Action); });
+}
+
+void
+FunctionServiceSession::Impl::HandleActionUpdates()
+{
+ std::vector<Ref<RunnerAction>> UpdatedActions;
+
+ m_UpdatedActionsLock.WithExclusiveLock([&] { std::swap(UpdatedActions, m_UpdatedActions); });
+
+ std::unordered_set<int> SeenLsn;
+ std::unordered_set<int> RunningLsn;
+
+ for (Ref<RunnerAction>& Action : UpdatedActions)
+ {
+ const int ActionLsn = Action->ActionLsn;
+
+ if (auto [It, Inserted] = SeenLsn.insert(ActionLsn); Inserted)
+ {
+ switch (Action->ActionState())
+ {
+ case RunnerAction::State::Pending:
+ m_PendingLock.WithExclusiveLock([&] { m_PendingActions.insert({ActionLsn, Action}); });
+ break;
+
+ case RunnerAction::State::Running:
+ m_PendingLock.WithExclusiveLock([&] {
+ m_RunningLock.WithExclusiveLock([&] {
+ m_RunningMap.insert({ActionLsn, Action});
+ m_PendingActions.erase(ActionLsn);
+ });
+ });
+ ZEN_DEBUG("action {} ({}) RUNNING", Action->ActionId, ActionLsn);
+ break;
+
+ case RunnerAction::State::Completed:
+ case RunnerAction::State::Failed:
+ m_ResultsLock.WithExclusiveLock([&] {
+ m_ResultsMap[ActionLsn] = Action;
+
+ m_PendingLock.WithExclusiveLock([&] {
+ m_RunningLock.WithExclusiveLock([&] {
+ if (auto FindIt = m_RunningMap.find(ActionLsn); FindIt == m_RunningMap.end())
+ {
+ m_PendingActions.erase(ActionLsn);
+ }
+ else
+ {
+ m_RunningMap.erase(FindIt);
+ }
+ });
+ });
+
+ m_ActionHistoryLock.WithExclusiveLock([&] {
+ ActionHistoryEntry Entry{.Lsn = ActionLsn,
+ .ActionId = Action->ActionId,
+ .WorkerId = Action->Worker.WorkerId,
+ .ActionDescriptor = Action->ActionObj,
+ .Succeeded = Action->ActionState() == RunnerAction::State::Completed};
+
+ std::copy(std::begin(Action->Timestamps), std::end(Action->Timestamps), std::begin(Entry.Timestamps));
+
+ m_ActionHistory.push_back(std::move(Entry));
+
+ if (m_ActionHistory.size() > m_HistoryLimit)
+ {
+ m_ActionHistory.pop_front();
+ }
+ });
+ });
+ m_RetiredCount.fetch_add(1);
+ m_ResultRate.Mark(1);
+ ZEN_DEBUG("action {} ({}) RUNNING -> COMPLETED with {}",
+ Action->ActionId,
+ ActionLsn,
+ Action->ActionState() == RunnerAction::State::Completed ? "SUCCESS" : "FAILURE");
+ break;
+ }
+ }
+ }
+}
+
+size_t
+FunctionServiceSession::Impl::QueryCapacity()
+{
+ return m_LocalRunnerGroup.QueryCapacity() + m_RemoteRunnerGroup.QueryCapacity();
+}
+
+std::vector<SubmitResult>
+FunctionServiceSession::Impl::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
+{
+ std::vector<SubmitResult> Results;
+
+ for (const Ref<RunnerAction>& Action : Actions)
+ {
+ Results.push_back(SubmitAction(Action));
+ }
+
+ return Results;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+FunctionServiceSession::FunctionServiceSession(ChunkResolver& InChunkResolver)
+{
+ m_Impl = std::make_unique<Impl>(this, InChunkResolver);
+}
+
+FunctionServiceSession::~FunctionServiceSession()
+{
+ Shutdown();
+}
+
+bool
+FunctionServiceSession::IsHealthy()
+{
+ return m_Impl->IsHealthy();
+}
+
+void
+FunctionServiceSession::Shutdown()
+{
+ m_Impl->Shutdown();
+}
+
+void
+FunctionServiceSession::StartRecording(ChunkResolver& InResolver, const std::filesystem::path& RecordingPath)
+{
+ m_Impl->StartRecording(InResolver, RecordingPath);
+}
+
+void
+FunctionServiceSession::StopRecording()
+{
+ m_Impl->StopRecording();
+}
+
+void
+FunctionServiceSession::EmitStats(CbObjectWriter& Cbo)
+{
+ m_Impl->EmitStats(Cbo);
+}
+
+std::vector<IoHash>
+FunctionServiceSession::GetKnownWorkerIds()
+{
+ return m_Impl->GetKnownWorkerIds();
+}
+
+WorkerDesc
+FunctionServiceSession::GetWorkerDescriptor(const IoHash& WorkerId)
+{
+ return m_Impl->GetWorkerDescriptor(WorkerId);
+}
+
+void
+FunctionServiceSession::AddLocalRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath)
+{
+ m_Impl->m_LocalRunnerGroup.AddRunner(new LocalProcessRunner(InChunkResolver, BasePath));
+}
+
+void
+FunctionServiceSession::AddRemoteRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath, std::string_view HostName)
+{
+ m_Impl->m_RemoteRunnerGroup.AddRunner(new RemoteHttpRunner(InChunkResolver, BasePath, HostName));
+}
+
+FunctionServiceSession::EnqueueResult
+FunctionServiceSession::EnqueueAction(CbObject ActionObject, int Priority)
+{
+ return m_Impl->EnqueueAction(ActionObject, Priority);
+}
+
+FunctionServiceSession::EnqueueResult
+FunctionServiceSession::EnqueueResolvedAction(WorkerDesc Worker, CbObject ActionObj, int RequestPriority)
+{
+ return m_Impl->EnqueueResolvedAction(Worker, ActionObj, RequestPriority);
+}
+
+void
+FunctionServiceSession::RegisterWorker(CbPackage Worker)
+{
+ m_Impl->RegisterWorker(Worker);
+}
+
+HttpResponseCode
+FunctionServiceSession::GetActionResult(int ActionLsn, CbPackage& OutResultPackage)
+{
+ return m_Impl->GetActionResult(ActionLsn, OutResultPackage);
+}
+
+HttpResponseCode
+FunctionServiceSession::FindActionResult(const IoHash& ActionId, CbPackage& OutResultPackage)
+{
+ return m_Impl->FindActionResult(ActionId, OutResultPackage);
+}
+
+std::vector<FunctionServiceSession::ActionHistoryEntry>
+FunctionServiceSession::GetActionHistory(int Limit)
+{
+ return m_Impl->GetActionHistory(Limit);
+}
+
+void
+FunctionServiceSession::GetCompleted(CbWriter& Cbo)
+{
+ m_Impl->GetCompleted(Cbo);
+}
+
+void
+FunctionServiceSession::PostUpdate(RunnerAction* Action)
+{
+ m_Impl->PostUpdate(Action);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+void
+function_forcelink()
+{
+}
+
+} // namespace zen::compute
+
+#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/src/zencompute/httpfunctionservice.cpp b/src/zencompute/httpfunctionservice.cpp
new file mode 100644
index 000000000..09a9684a7
--- /dev/null
+++ b/src/zencompute/httpfunctionservice.cpp
@@ -0,0 +1,709 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "zencompute/httpfunctionservice.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include "functionrunner.h"
+
+# include <zencore/compactbinary.h>
+# include <zencore/compactbinarybuilder.h>
+# include <zencore/compactbinarypackage.h>
+# include <zencore/compress.h>
+# include <zencore/except.h>
+# include <zencore/filesystem.h>
+# include <zencore/fmtutils.h>
+# include <zencore/iobuffer.h>
+# include <zencore/iohash.h>
+# include <zencore/system.h>
+# include <zenstore/cidstore.h>
+
+# include <span>
+
+using namespace std::literals;
+
+namespace zen::compute {
+
+constinit AsciiSet g_DecimalSet("0123456789");
+auto DecimalMatcher = [](std::string_view Str) { return AsciiSet::HasOnly(Str, g_DecimalSet); };
+
+constinit AsciiSet g_HexSet("0123456789abcdefABCDEF");
+auto IoHashMatcher = [](std::string_view Str) { return Str.size() == 40 && AsciiSet::HasOnly(Str, g_HexSet); };
+
+HttpFunctionService::HttpFunctionService(CidStore& InCidStore,
+ IHttpStatsService& StatsService,
+ [[maybe_unused]] const std::filesystem::path& BaseDir)
+: m_CidStore(InCidStore)
+, m_StatsService(StatsService)
+, m_Log(logging::Get("apply"))
+, m_BaseDir(BaseDir)
+, m_FunctionService(InCidStore)
+{
+ m_FunctionService.AddLocalRunner(InCidStore, m_BaseDir / "local");
+
+ m_StatsService.RegisterHandler("apply", *this);
+
+ m_Router.AddMatcher("lsn", DecimalMatcher);
+ m_Router.AddMatcher("worker", IoHashMatcher);
+ m_Router.AddMatcher("action", IoHashMatcher);
+
+ m_Router.RegisterRoute(
+ "ready",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ if (m_FunctionService.IsHealthy())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, "ok");
+ }
+
+ return HttpReq.WriteResponse(HttpResponseCode::ServiceUnavailable);
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "workers",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("workers"sv);
+ for (const IoHash& WorkerId : m_FunctionService.GetKnownWorkerIds())
+ {
+ Cbo << WorkerId;
+ }
+ Cbo.EndArray();
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "workers/{worker}",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1));
+
+ switch (HttpReq.RequestVerb())
+ {
+ case HttpVerb::kGet:
+ if (WorkerDesc Desc = m_FunctionService.GetWorkerDescriptor(WorkerId))
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Desc.Descriptor.GetObject());
+ }
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+
+ case HttpVerb::kPost:
+ {
+ switch (HttpReq.RequestContentType())
+ {
+ case HttpContentType::kCbObject:
+ {
+ CbObject WorkerSpec = HttpReq.ReadPayloadObject();
+
+ // Determine which pieces are missing and need to be transmitted
+
+ HashKeySet ChunkSet;
+
+ WorkerSpec.IterateAttachments([&](CbFieldView Field) {
+ const IoHash Hash = Field.AsHash();
+ ChunkSet.AddHashToSet(Hash);
+ });
+
+ CbPackage WorkerPackage;
+ WorkerPackage.SetObject(WorkerSpec);
+
+ m_CidStore.FilterChunks(ChunkSet);
+
+ if (ChunkSet.IsEmpty())
+ {
+ ZEN_DEBUG("worker {}: all attachments already available", WorkerId);
+ m_FunctionService.RegisterWorker(WorkerPackage);
+ return HttpReq.WriteResponse(HttpResponseCode::NoContent);
+ }
+
+ CbObjectWriter ResponseWriter;
+ ResponseWriter.BeginArray("need");
+
+ ChunkSet.IterateHashes([&](const IoHash& Hash) {
+ ZEN_DEBUG("worker {}: need chunk {}", WorkerId, Hash);
+ ResponseWriter.AddHash(Hash);
+ });
+
+ ResponseWriter.EndArray();
+
+ ZEN_DEBUG("worker {}: need {} attachments", WorkerId, ChunkSet.GetSize());
+
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound, ResponseWriter.Save());
+ }
+ break;
+
+ case HttpContentType::kCbPackage:
+ {
+ CbPackage WorkerSpecPackage = HttpReq.ReadPayloadPackage();
+ CbObject WorkerSpec = WorkerSpecPackage.GetObject();
+
+ std::span<const CbAttachment> Attachments = WorkerSpecPackage.GetAttachments();
+
+ int AttachmentCount = 0;
+ int NewAttachmentCount = 0;
+ uint64_t TotalAttachmentBytes = 0;
+ uint64_t TotalNewBytes = 0;
+
+ for (const CbAttachment& Attachment : Attachments)
+ {
+ ZEN_ASSERT(Attachment.IsCompressedBinary());
+
+ const IoHash DataHash = Attachment.GetHash();
+ CompressedBuffer Buffer = Attachment.AsCompressedBinary();
+
+ ZEN_UNUSED(DataHash);
+ TotalAttachmentBytes += Buffer.GetCompressedSize();
+ ++AttachmentCount;
+
+ const CidStore::InsertResult InsertResult =
+ m_CidStore.AddChunk(Buffer.GetCompressed().Flatten().AsIoBuffer(), DataHash);
+
+ if (InsertResult.New)
+ {
+ TotalNewBytes += Buffer.GetCompressedSize();
+ ++NewAttachmentCount;
+ }
+ }
+
+ ZEN_DEBUG("worker {}: {} in {} attachments, {} in {} new attachments",
+ WorkerId,
+ zen::NiceBytes(TotalAttachmentBytes),
+ AttachmentCount,
+ zen::NiceBytes(TotalNewBytes),
+ NewAttachmentCount);
+
+ m_FunctionService.RegisterWorker(WorkerSpecPackage);
+
+ return HttpReq.WriteResponse(HttpResponseCode::NoContent);
+ }
+ break;
+
+ default:
+ break;
+ }
+ }
+ break;
+
+ default:
+ break;
+ }
+ },
+ HttpVerb::kGet | HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "jobs/completed",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ CbObjectWriter Cbo;
+ m_FunctionService.GetCompleted(Cbo);
+
+ SystemMetrics Sm = GetSystemMetricsForReporting();
+ Cbo.BeginObject("metrics");
+ Describe(Sm, Cbo);
+ Cbo.EndObject();
+
+ HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "jobs/history",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const auto QueryParams = HttpReq.GetQueryParams();
+
+ int QueryLimit = 50;
+
+ if (auto LimitParam = QueryParams.GetValue("limit"); LimitParam.empty() == false)
+ {
+ QueryLimit = ParseInt<int>(LimitParam).value_or(50);
+ }
+
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("history");
+ for (const auto& Entry : m_FunctionService.GetActionHistory(QueryLimit))
+ {
+ Cbo.BeginObject();
+ Cbo << "lsn"sv << Entry.Lsn;
+ Cbo << "actionId"sv << Entry.ActionId;
+ Cbo << "workerId"sv << Entry.WorkerId;
+ Cbo << "succeeded"sv << Entry.Succeeded;
+ Cbo << "actionDescriptor"sv << Entry.ActionDescriptor;
+
+ for (const auto& Timestamp : Entry.Timestamps)
+ {
+ Cbo.AddInteger(
+ fmt::format("time_{}"sv, RunnerAction::ToString(static_cast<RunnerAction::State>(&Timestamp - Entry.Timestamps))),
+ Timestamp);
+ }
+ Cbo.EndObject();
+ }
+ Cbo.EndArray();
+
+ HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "jobs/{lsn}",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const int ActionLsn = std::stoi(std::string{Req.GetCapture(1)});
+
+ switch (HttpReq.RequestVerb())
+ {
+ case HttpVerb::kGet:
+ {
+ CbPackage Output;
+ HttpResponseCode ResponseCode = m_FunctionService.GetActionResult(ActionLsn, Output);
+
+ if (ResponseCode == HttpResponseCode::OK)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
+ }
+
+ return HttpReq.WriteResponse(ResponseCode);
+ }
+ break;
+
+ case HttpVerb::kPost:
+ {
+ // Add support for cancellation, priority changes
+ }
+ break;
+
+ default:
+ break;
+ }
+ },
+ HttpVerb::kGet | HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "jobs/{worker}/{action}", // This route is inefficient, and is only here for backwards compatibility. The preferred path is the
+ // one which uses the scheduled action lsn for lookups
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const IoHash ActionId = IoHash::FromHexString(Req.GetCapture(2));
+
+ CbPackage Output;
+ if (HttpResponseCode ResponseCode = m_FunctionService.FindActionResult(ActionId, /* out */ Output);
+ ResponseCode != HttpResponseCode::OK)
+ {
+ ZEN_TRACE("jobs/{}/{}: {}", Req.GetCapture(1), Req.GetCapture(2), ToString(ResponseCode))
+
+ if (ResponseCode == HttpResponseCode::NotFound)
+ {
+ return HttpReq.WriteResponse(ResponseCode);
+ }
+
+ return HttpReq.WriteResponse(ResponseCode);
+ }
+
+ ZEN_DEBUG("jobs/{}/{}: OK", Req.GetCapture(1), Req.GetCapture(2))
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "jobs/{worker}",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1));
+
+ WorkerDesc Worker = m_FunctionService.GetWorkerDescriptor(WorkerId);
+
+ if (!Worker)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ const auto QueryParams = Req.ServerRequest().GetQueryParams();
+
+ int RequestPriority = -1;
+
+ if (auto PriorityParam = QueryParams.GetValue("priority"); PriorityParam.empty() == false)
+ {
+ RequestPriority = ParseInt<int>(PriorityParam).value_or(-1);
+ }
+
+ switch (HttpReq.RequestVerb())
+ {
+ case HttpVerb::kGet:
+ // TODO: return status of all pending or executing jobs
+ break;
+
+ case HttpVerb::kPost:
+ switch (HttpReq.RequestContentType())
+ {
+ case HttpContentType::kCbObject:
+ {
+ // This operation takes the proposed job spec and identifies which
+ // chunks are not present on this server. This list is then returned in
+ // the "need" list in the response
+
+ IoBuffer Payload = HttpReq.ReadPayload();
+ CbObject ActionObj = LoadCompactBinaryObject(Payload);
+
+ std::vector<IoHash> NeedList;
+
+ ActionObj.IterateAttachments([&](CbFieldView Field) {
+ const IoHash FileHash = Field.AsHash();
+
+ if (!m_CidStore.ContainsChunk(FileHash))
+ {
+ NeedList.push_back(FileHash);
+ }
+ });
+
+ if (NeedList.empty())
+ {
+ // We already have everything, enqueue the action for execution
+
+ if (FunctionServiceSession::EnqueueResult Result =
+ m_FunctionService.EnqueueResolvedAction(Worker, ActionObj, RequestPriority))
+ {
+ ZEN_DEBUG("action {} accepted (lsn {})", ActionObj.GetHash(), Result.Lsn);
+
+ HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage);
+ }
+
+ return;
+ }
+
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("need");
+
+ for (const IoHash& Hash : NeedList)
+ {
+ Cbo << Hash;
+ }
+
+ Cbo.EndArray();
+ CbObject Response = Cbo.Save();
+
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound, Response);
+ }
+ break;
+
+ case HttpContentType::kCbPackage:
+ {
+ CbPackage Action = HttpReq.ReadPayloadPackage();
+ CbObject ActionObj = Action.GetObject();
+
+ std::span<const CbAttachment> Attachments = Action.GetAttachments();
+
+ int AttachmentCount = 0;
+ int NewAttachmentCount = 0;
+ uint64_t TotalAttachmentBytes = 0;
+ uint64_t TotalNewBytes = 0;
+
+ for (const CbAttachment& Attachment : Attachments)
+ {
+ ZEN_ASSERT(Attachment.IsCompressedBinary());
+
+ const IoHash DataHash = Attachment.GetHash();
+ CompressedBuffer DataView = Attachment.AsCompressedBinary();
+
+ ZEN_UNUSED(DataHash);
+
+ const uint64_t CompressedSize = DataView.GetCompressedSize();
+
+ TotalAttachmentBytes += CompressedSize;
+ ++AttachmentCount;
+
+ const CidStore::InsertResult InsertResult =
+ m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash);
+
+ if (InsertResult.New)
+ {
+ TotalNewBytes += CompressedSize;
+ ++NewAttachmentCount;
+ }
+ }
+
+ if (FunctionServiceSession::EnqueueResult Result =
+ m_FunctionService.EnqueueResolvedAction(Worker, ActionObj, RequestPriority))
+ {
+ ZEN_DEBUG("accepted action {} (lsn {}): {} in {} attachments. {} new ({} attachments)",
+ ActionObj.GetHash(),
+ Result.Lsn,
+ zen::NiceBytes(TotalAttachmentBytes),
+ AttachmentCount,
+ zen::NiceBytes(TotalNewBytes),
+ NewAttachmentCount);
+
+ HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage);
+ }
+
+ return;
+ }
+ break;
+
+ default:
+ break;
+ }
+ break;
+
+ default:
+ break;
+ }
+ },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "jobs",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ const auto QueryParams = HttpReq.GetQueryParams();
+
+ int RequestPriority = -1;
+
+ if (auto PriorityParam = QueryParams.GetValue("priority"); PriorityParam.empty() == false)
+ {
+ RequestPriority = ParseInt<int>(PriorityParam).value_or(-1);
+ }
+
+ // Resolve worker
+
+ //
+
+ switch (HttpReq.RequestContentType())
+ {
+ case HttpContentType::kCbObject:
+ {
+ // This operation takes the proposed job spec and identifies which
+ // chunks are not present on this server. This list is then returned in
+ // the "need" list in the response
+
+ IoBuffer Payload = HttpReq.ReadPayload();
+ CbObject ActionObj = LoadCompactBinaryObject(Payload);
+
+ std::vector<IoHash> NeedList;
+
+ ActionObj.IterateAttachments([&](CbFieldView Field) {
+ const IoHash FileHash = Field.AsHash();
+
+ if (!m_CidStore.ContainsChunk(FileHash))
+ {
+ NeedList.push_back(FileHash);
+ }
+ });
+
+ if (NeedList.empty())
+ {
+ // We already have everything, enqueue the action for execution
+
+ if (FunctionServiceSession::EnqueueResult Result = m_FunctionService.EnqueueAction(ActionObj, RequestPriority))
+ {
+ ZEN_DEBUG("action accepted (lsn {})", Result.Lsn);
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage);
+ }
+ else
+ {
+ // Could not resolve?
+ return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage);
+ }
+ }
+
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("need");
+
+ for (const IoHash& Hash : NeedList)
+ {
+ Cbo << Hash;
+ }
+
+ Cbo.EndArray();
+ CbObject Response = Cbo.Save();
+
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound, Response);
+ }
+
+ case HttpContentType::kCbPackage:
+ {
+ CbPackage Action = HttpReq.ReadPayloadPackage();
+ CbObject ActionObj = Action.GetObject();
+
+ std::span<const CbAttachment> Attachments = Action.GetAttachments();
+
+ int AttachmentCount = 0;
+ int NewAttachmentCount = 0;
+ uint64_t TotalAttachmentBytes = 0;
+ uint64_t TotalNewBytes = 0;
+
+ for (const CbAttachment& Attachment : Attachments)
+ {
+ ZEN_ASSERT(Attachment.IsCompressedBinary());
+
+ const IoHash DataHash = Attachment.GetHash();
+ CompressedBuffer DataView = Attachment.AsCompressedBinary();
+
+ ZEN_UNUSED(DataHash);
+
+ const uint64_t CompressedSize = DataView.GetCompressedSize();
+
+ TotalAttachmentBytes += CompressedSize;
+ ++AttachmentCount;
+
+ const CidStore::InsertResult InsertResult =
+ m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash);
+
+ if (InsertResult.New)
+ {
+ TotalNewBytes += CompressedSize;
+ ++NewAttachmentCount;
+ }
+ }
+
+ if (FunctionServiceSession::EnqueueResult Result = m_FunctionService.EnqueueAction(ActionObj, RequestPriority))
+ {
+ ZEN_DEBUG("accepted action (lsn {}): {} in {} attachments. {} new ({} attachments)",
+ Result.Lsn,
+ zen::NiceBytes(TotalAttachmentBytes),
+ AttachmentCount,
+ zen::NiceBytes(TotalNewBytes),
+ NewAttachmentCount);
+
+ HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage);
+ }
+ else
+ {
+ // Could not resolve?
+ return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage);
+ }
+ }
+ return;
+ }
+ },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "workers/all",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ std::vector<IoHash> WorkerIds = m_FunctionService.GetKnownWorkerIds();
+
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("workers");
+
+ for (const IoHash& WorkerId : WorkerIds)
+ {
+ Cbo.BeginObject();
+
+ Cbo << "id" << WorkerId;
+
+ const auto& Descriptor = m_FunctionService.GetWorkerDescriptor(WorkerId);
+
+ Cbo << "descriptor" << Descriptor.Descriptor.GetObject();
+
+ Cbo.EndObject();
+ }
+
+ Cbo.EndArray();
+
+ HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "sysinfo",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ SystemMetrics Sm = GetSystemMetricsForReporting();
+
+ CbObjectWriter Cbo;
+ Describe(Sm, Cbo);
+
+ Cbo << "cpu_usage" << Sm.CpuUsagePercent;
+ Cbo << "memory_total" << Sm.SystemMemoryMiB * 1024 * 1024;
+ Cbo << "memory_used" << (Sm.SystemMemoryMiB - Sm.AvailSystemMemoryMiB) * 1024 * 1024;
+ Cbo << "disk_used" << 100 * 1024;
+ Cbo << "disk_total" << 100 * 1024 * 1024;
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "record/start",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ m_FunctionService.StartRecording(m_CidStore, m_BaseDir / "recording");
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK);
+ },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "record/stop",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ m_FunctionService.StopRecording();
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK);
+ },
+ HttpVerb::kPost);
+}
+
+HttpFunctionService::~HttpFunctionService()
+{
+ m_StatsService.UnregisterHandler("apply", *this);
+}
+
+void
+HttpFunctionService::Shutdown()
+{
+ m_FunctionService.Shutdown();
+}
+
+const char*
+HttpFunctionService::BaseUri() const
+{
+ return "/apply/";
+}
+
+void
+HttpFunctionService::HandleRequest(HttpServerRequest& Request)
+{
+ metrics::OperationTiming::Scope $(m_HttpRequests);
+
+ if (m_Router.HandleRequest(Request) == false)
+ {
+ ZEN_WARN("No route found for {0}", Request.RelativeUri());
+ }
+}
+
+void
+HttpFunctionService::HandleStatsRequest(HttpServerRequest& Request)
+{
+ CbObjectWriter Cbo;
+ m_FunctionService.EmitStats(Cbo);
+
+ Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+void
+httpfunction_forcelink()
+{
+}
+
+} // namespace zen::compute
+
+#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/src/zencompute/httporchestrator.cpp b/src/zencompute/httporchestrator.cpp
new file mode 100644
index 000000000..39e7e60d7
--- /dev/null
+++ b/src/zencompute/httporchestrator.cpp
@@ -0,0 +1,81 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "zencompute/httporchestrator.h"
+
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/logging.h>
+
+namespace zen::compute {
+
+HttpOrchestratorService::HttpOrchestratorService() : m_Log(logging::Get("orch"))
+{
+ m_Router.RegisterRoute(
+ "provision",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("workers");
+
+ m_KnownWorkersLock.WithSharedLock([&] {
+ for (const auto& [WorkerId, Worker] : m_KnownWorkers)
+ {
+ Cbo.BeginObject();
+ Cbo << "uri" << Worker.BaseUri;
+ Cbo << "dt" << Worker.LastSeen.GetElapsedTimeMs();
+ Cbo.EndObject();
+ }
+ });
+
+ Cbo.EndArray();
+
+ HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "announce",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ CbObject Data = HttpReq.ReadPayloadObject();
+
+ std::string_view WorkerId = Data["id"].AsString("");
+ std::string_view WorkerUri = Data["uri"].AsString("");
+
+ if (WorkerId.empty() || WorkerUri.empty())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest);
+ }
+
+ m_KnownWorkersLock.WithExclusiveLock([&] {
+ auto& Worker = m_KnownWorkers[std::string(WorkerId)];
+ Worker.BaseUri = WorkerUri;
+ Worker.LastSeen.Reset();
+ });
+
+ HttpReq.WriteResponse(HttpResponseCode::OK);
+ },
+ HttpVerb::kPost);
+}
+
+HttpOrchestratorService::~HttpOrchestratorService()
+{
+}
+
+const char*
+HttpOrchestratorService::BaseUri() const
+{
+ return "/orch/";
+}
+
+void
+HttpOrchestratorService::HandleRequest(HttpServerRequest& Request)
+{
+ if (m_Router.HandleRequest(Request) == false)
+ {
+ ZEN_WARN("No route found for {0}", Request.RelativeUri());
+ }
+}
+
+} // namespace zen::compute
diff --git a/src/zencompute/include/zencompute/functionservice.h b/src/zencompute/include/zencompute/functionservice.h
new file mode 100644
index 000000000..1deb99fd5
--- /dev/null
+++ b/src/zencompute/include/zencompute/functionservice.h
@@ -0,0 +1,132 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/zencore.h>
+
+#if !defined(ZEN_WITH_COMPUTE_SERVICES)
+# define ZEN_WITH_COMPUTE_SERVICES 1
+#endif
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include <zencore/compactbinary.h>
+# include <zencore/compactbinarypackage.h>
+# include <zencore/iohash.h>
+# include <zenstore/zenstore.h>
+# include <zenhttp/httpcommon.h>
+
+# include <filesystem>
+
+namespace zen {
+class ChunkResolver;
+class CbObjectWriter;
+} // namespace zen
+
+namespace zen::compute {
+
+class ActionRecorder;
+class FunctionServiceSession;
+class IActionResultHandler;
+class LocalProcessRunner;
+class RemoteHttpRunner;
+struct RunnerAction;
+struct SubmitResult;
+
+struct WorkerDesc
+{
+ CbPackage Descriptor;
+ IoHash WorkerId{IoHash::Zero};
+
+ inline operator bool() const { return WorkerId != IoHash::Zero; }
+};
+
+/**
+ * Lambda style compute function service
+ *
+ * The responsibility of this class is to accept function execution requests, and
+ * schedule them using one or more FunctionRunner instances. It will basically always
+ * accept requests, queueing them if necessary, and then hand them off to runners
+ * as they become available.
+ *
+ * This is typically fronted by an API service that handles communication with clients.
+ */
+class FunctionServiceSession final
+{
+public:
+ FunctionServiceSession(ChunkResolver& InChunkResolver);
+ ~FunctionServiceSession();
+
+ void Shutdown();
+ bool IsHealthy();
+
+ // Worker registration and discovery
+
+ void RegisterWorker(CbPackage Worker);
+ [[nodiscard]] WorkerDesc GetWorkerDescriptor(const IoHash& WorkerId);
+ [[nodiscard]] std::vector<IoHash> GetKnownWorkerIds();
+
+ // Action runners
+
+ void AddLocalRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath);
+ void AddRemoteRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath, std::string_view HostName);
+
+ // Action submission
+
+ struct EnqueueResult
+ {
+ int Lsn;
+ CbObject ResponseMessage;
+
+ inline operator bool() const { return Lsn != 0; }
+ };
+
+ [[nodiscard]] EnqueueResult EnqueueResolvedAction(WorkerDesc Worker, CbObject ActionObj, int Priority);
+ [[nodiscard]] EnqueueResult EnqueueAction(CbObject ActionObject, int Priority);
+
+ // Completed action tracking
+
+ [[nodiscard]] HttpResponseCode GetActionResult(int ActionLsn, CbPackage& OutResultPackage);
+ [[nodiscard]] HttpResponseCode FindActionResult(const IoHash& ActionId, CbPackage& ResultPackage);
+
+ void GetCompleted(CbWriter&);
+
+ // Action history tracking (note that this is separate from completed action tracking, and
+ // will include actions which have been retired and no longer have their results available)
+
+ struct ActionHistoryEntry
+ {
+ int Lsn;
+ IoHash ActionId;
+ IoHash WorkerId;
+ CbObject ActionDescriptor;
+ bool Succeeded;
+ uint64_t Timestamps[5] = {};
+ };
+
+ [[nodiscard]] std::vector<ActionHistoryEntry> GetActionHistory(int Limit = 100);
+
+ // Stats reporting
+
+ void EmitStats(CbObjectWriter& Cbo);
+
+ // Recording
+
+ void StartRecording(ChunkResolver& InResolver, const std::filesystem::path& RecordingPath);
+ void StopRecording();
+
+private:
+ void PostUpdate(RunnerAction* Action);
+
+ friend class FunctionRunner;
+ friend struct RunnerAction;
+
+ struct Impl;
+ std::unique_ptr<Impl> m_Impl;
+};
+
+void function_forcelink();
+
+} // namespace zen::compute
+
+#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/src/zencompute/include/zencompute/httpfunctionservice.h b/src/zencompute/include/zencompute/httpfunctionservice.h
new file mode 100644
index 000000000..6e2344ae6
--- /dev/null
+++ b/src/zencompute/include/zencompute/httpfunctionservice.h
@@ -0,0 +1,73 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/zencore.h>
+
+#if !defined(ZEN_WITH_COMPUTE_SERVICES)
+# define ZEN_WITH_COMPUTE_SERVICES 1
+#endif
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include "zencompute/functionservice.h"
+
+# include <zencore/compactbinary.h>
+# include <zencore/compactbinarypackage.h>
+# include <zencore/iohash.h>
+# include <zencore/logging.h>
+# include <zentelemetry/stats.h>
+# include <zenhttp/httpserver.h>
+
+# include <deque>
+# include <filesystem>
+# include <unordered_map>
+
+namespace zen {
+class CidStore;
+}
+
+namespace zen::compute {
+
+class HttpFunctionService;
+class FunctionService;
+
+/**
+ * HTTP interface for compute function service
+ */
+class HttpFunctionService : public HttpService, public IHttpStatsProvider
+{
+public:
+ HttpFunctionService(CidStore& InCidStore, IHttpStatsService& StatsService, const std::filesystem::path& BaseDir);
+ ~HttpFunctionService();
+
+ void Shutdown();
+
+ virtual const char* BaseUri() const override;
+ virtual void HandleRequest(HttpServerRequest& Request) override;
+
+ // IHttpStatsProvider
+
+ virtual void HandleStatsRequest(HttpServerRequest& Request) override;
+
+protected:
+ CidStore& m_CidStore;
+ IHttpStatsService& m_StatsService;
+ LoggerRef Log() { return m_Log; }
+
+private:
+ LoggerRef m_Log;
+ std::filesystem ::path m_BaseDir;
+ HttpRequestRouter m_Router;
+ FunctionServiceSession m_FunctionService;
+
+ // Metrics
+
+ metrics::OperationTiming m_HttpRequests;
+};
+
+void httpfunction_forcelink();
+
+} // namespace zen::compute
+
+#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/src/zencompute/include/zencompute/httporchestrator.h b/src/zencompute/include/zencompute/httporchestrator.h
new file mode 100644
index 000000000..168c6d7fe
--- /dev/null
+++ b/src/zencompute/include/zencompute/httporchestrator.h
@@ -0,0 +1,44 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/logging.h>
+#include <zencore/thread.h>
+#include <zencore/timer.h>
+#include <zenhttp/httpserver.h>
+
+#include <unordered_map>
+
+namespace zen::compute {
+
+/**
+ * Mock orchestrator service, for testing dynamic provisioning
+ */
+
+class HttpOrchestratorService : public HttpService
+{
+public:
+ HttpOrchestratorService();
+ ~HttpOrchestratorService();
+
+ HttpOrchestratorService(const HttpOrchestratorService&) = delete;
+ HttpOrchestratorService& operator=(const HttpOrchestratorService&) = delete;
+
+ virtual const char* BaseUri() const override;
+ virtual void HandleRequest(HttpServerRequest& Request) override;
+
+private:
+ HttpRequestRouter m_Router;
+ LoggerRef m_Log;
+
+ struct KnownWorker
+ {
+ std::string_view BaseUri;
+ Stopwatch LastSeen;
+ };
+
+ RwLock m_KnownWorkersLock;
+ std::unordered_map<std::string, KnownWorker> m_KnownWorkers;
+};
+
+} // namespace zen::compute
diff --git a/src/zencompute/include/zencompute/recordingreader.h b/src/zencompute/include/zencompute/recordingreader.h
new file mode 100644
index 000000000..bf1aff125
--- /dev/null
+++ b/src/zencompute/include/zencompute/recordingreader.h
@@ -0,0 +1,127 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencompute/functionservice.h>
+#include <zencompute/zencompute.h>
+#include <zencore/basicfile.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zenstore/cidstore.h>
+#include <zenstore/gc.h>
+#include <zenstore/zenstore.h>
+
+#include <filesystem>
+#include <functional>
+#include <unordered_map>
+
+namespace zen {
+class CbObject;
+class CbPackage;
+struct IoHash;
+} // namespace zen
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+namespace zen::compute {
+
+//////////////////////////////////////////////////////////////////////////
+
+class RecordingReaderBase
+{
+ RecordingReaderBase(const RecordingReaderBase&) = delete;
+ RecordingReaderBase& operator=(const RecordingReaderBase&) = delete;
+
+public:
+ RecordingReaderBase() = default;
+ virtual ~RecordingReaderBase() = 0;
+ virtual std::unordered_map<IoHash, CbPackage> ReadWorkers() = 0;
+ virtual void IterateActions(std::function<void(CbObject ActionObject, const IoHash& ActionId)>&& Callback, int TargetParallelism) = 0;
+ virtual size_t GetActionCount() const = 0;
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+/**
+ * Reader for recordings done via the zencompute recording system, which
+ * have a shared chunk store and a log of actions with pointers into the
+ * chunk store for their data.
+ */
+class RecordingReader : public RecordingReaderBase, public ChunkResolver
+{
+public:
+ explicit RecordingReader(const std::filesystem::path& RecordingPath);
+ ~RecordingReader();
+
+ virtual std::unordered_map<zen::IoHash, zen::CbPackage> ReadWorkers() override;
+
+ virtual void IterateActions(std::function<void(CbObject ActionObject, const IoHash& ActionId)>&& Callback,
+ int TargetParallelism) override;
+ virtual size_t GetActionCount() const override;
+
+private:
+ std::filesystem::path m_RecordingLogDir;
+ BasicFile m_WorkerDataFile;
+ BasicFile m_ActionDataFile;
+ GcManager m_Gc;
+ CidStore m_CidStore{m_Gc};
+
+ // ChunkResolver interface
+ virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) override;
+
+ struct ActionEntry
+ {
+ IoHash ActionId;
+ uint64_t Offset;
+ uint64_t Size;
+ };
+
+ std::vector<ActionEntry> m_Actions;
+
+ void ScanActions();
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+struct LocalResolver : public ChunkResolver
+{
+ LocalResolver(const LocalResolver&) = delete;
+ LocalResolver& operator=(const LocalResolver&) = delete;
+
+ LocalResolver() = default;
+ ~LocalResolver() = default;
+
+ virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) override;
+ void Add(const IoHash& Cid, IoBuffer Data);
+
+private:
+ RwLock MapLock;
+ std::unordered_map<IoHash, IoBuffer> Attachments;
+};
+
+/**
+ * This is a reader for UE/DDB recordings, which have a different layout on
+ * disk (no shared chunk store)
+ */
+class UeRecordingReader : public RecordingReaderBase, public ChunkResolver
+{
+public:
+ explicit UeRecordingReader(const std::filesystem::path& RecordingPath);
+ ~UeRecordingReader();
+
+ virtual std::unordered_map<zen::IoHash, zen::CbPackage> ReadWorkers() override;
+ virtual void IterateActions(std::function<void(CbObject ActionObject, const IoHash& ActionId)>&& Callback,
+ int TargetParallelism) override;
+ virtual size_t GetActionCount() const override;
+ virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) override;
+
+private:
+ std::filesystem::path m_RecordingDir;
+ LocalResolver m_LocalResolver;
+ std::vector<std::filesystem::path> m_WorkDirs;
+
+ CbPackage ReadAction(std::filesystem::path WorkDir);
+};
+
+} // namespace zen::compute
+
+#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/src/zencompute/include/zencompute/zencompute.h b/src/zencompute/include/zencompute/zencompute.h
new file mode 100644
index 000000000..6dc32eeea
--- /dev/null
+++ b/src/zencompute/include/zencompute/zencompute.h
@@ -0,0 +1,11 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/zencore.h>
+
+namespace zen {
+
+void zencompute_forcelinktests();
+
+}
diff --git a/src/zencompute/localrunner.cpp b/src/zencompute/localrunner.cpp
new file mode 100644
index 000000000..9a27f3f3d
--- /dev/null
+++ b/src/zencompute/localrunner.cpp
@@ -0,0 +1,722 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "localrunner.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include <zencore/compactbinary.h>
+# include <zencore/compactbinarybuilder.h>
+# include <zencore/compactbinarypackage.h>
+# include <zencore/compress.h>
+# include <zencore/except.h>
+# include <zencore/filesystem.h>
+# include <zencore/fmtutils.h>
+# include <zencore/iobuffer.h>
+# include <zencore/iohash.h>
+# include <zencore/system.h>
+# include <zencore/scopeguard.h>
+# include <zencore/timer.h>
+# include <zenstore/cidstore.h>
+
+# include <span>
+
+namespace zen::compute {
+
+using namespace std::literals;
+
+LocalProcessRunner::LocalProcessRunner(ChunkResolver& Resolver, const std::filesystem::path& BaseDir)
+: FunctionRunner(BaseDir)
+, m_Log(logging::Get("local_exec"))
+, m_ChunkResolver(Resolver)
+, m_WorkerPath(std::filesystem::weakly_canonical(BaseDir / "workers"))
+, m_SandboxPath(std::filesystem::weakly_canonical(BaseDir / "scratch"))
+{
+ SystemMetrics Sm = GetSystemMetricsForReporting();
+
+ m_MaxRunningActions = Sm.LogicalProcessorCount * 2;
+
+ ZEN_INFO("Max concurrent action count: {}", m_MaxRunningActions);
+
+ bool DidCleanup = false;
+
+ if (std::filesystem::is_directory(m_ActionsPath))
+ {
+ ZEN_INFO("Cleaning '{}'", m_ActionsPath);
+
+ std::error_code Ec;
+ CleanDirectory(m_ActionsPath, /* ForceRemoveReadOnlyFiles */ true, Ec);
+
+ if (Ec)
+ {
+ ZEN_WARN("Unable to clean '{}': {}", m_ActionsPath, Ec.message());
+ }
+
+ DidCleanup = true;
+ }
+
+ if (std::filesystem::is_directory(m_SandboxPath))
+ {
+ ZEN_INFO("Cleaning '{}'", m_SandboxPath);
+ std::error_code Ec;
+ CleanDirectory(m_SandboxPath, /* ForceRemoveReadOnlyFiles */ true, Ec);
+
+ if (Ec)
+ {
+ ZEN_WARN("Unable to clean '{}': {}", m_SandboxPath, Ec.message());
+ }
+
+ DidCleanup = true;
+ }
+
+ // We clean out all workers on startup since we can't know they are good. They could be bad
+ // due to tampering, malware (which I also mean to include AV and antimalware software) or
+ // other processes we have no control over
+ if (std::filesystem::is_directory(m_WorkerPath))
+ {
+ ZEN_INFO("Cleaning '{}'", m_WorkerPath);
+ std::error_code Ec;
+ CleanDirectory(m_WorkerPath, /* ForceRemoveReadOnlyFiles */ true, Ec);
+
+ if (Ec)
+ {
+ ZEN_WARN("Unable to clean '{}': {}", m_WorkerPath, Ec.message());
+ }
+
+ DidCleanup = true;
+ }
+
+ if (DidCleanup)
+ {
+ ZEN_INFO("Cleanup complete");
+ }
+
+ m_MonitorThread = std::thread{&LocalProcessRunner::MonitorThreadFunction, this};
+
+# if ZEN_PLATFORM_WINDOWS
+ // Suppress any error dialogs caused by missing dependencies
+ UINT OldMode = ::SetErrorMode(0);
+ ::SetErrorMode(OldMode | SEM_FAILCRITICALERRORS);
+# endif
+
+ m_AcceptNewActions = true;
+}
+
+LocalProcessRunner::~LocalProcessRunner()
+{
+ try
+ {
+ Shutdown();
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_WARN("exception during local process runner shutdown: {}", Ex.what());
+ }
+}
+
+void
+LocalProcessRunner::Shutdown()
+{
+ m_AcceptNewActions = false;
+
+ m_MonitorThreadEnabled = false;
+ m_MonitorThreadEvent.Set();
+ if (m_MonitorThread.joinable())
+ {
+ m_MonitorThread.join();
+ }
+
+ CancelRunningActions();
+}
+
+std::filesystem::path
+LocalProcessRunner::CreateNewSandbox()
+{
+ std::string UniqueId = std::to_string(++m_SandboxCounter);
+ std::filesystem::path Path = m_SandboxPath / UniqueId;
+ zen::CreateDirectories(Path);
+
+ return Path;
+}
+
+void
+LocalProcessRunner::RegisterWorker(const CbPackage& WorkerPackage)
+{
+ if (m_DumpActions)
+ {
+ CbObject WorkerDescriptor = WorkerPackage.GetObject();
+ const IoHash& WorkerId = WorkerPackage.GetObjectHash();
+
+ std::string UniqueId = fmt::format("worker_{}"sv, WorkerId);
+ std::filesystem::path Path = m_ActionsPath / UniqueId;
+
+ zen::WriteFile(Path / "worker.ucb", WorkerDescriptor.GetBuffer().AsIoBuffer());
+
+ ManifestWorker(WorkerPackage, Path / "tree", [&](const IoHash& Cid, CompressedBuffer& ChunkBuffer) {
+ std::filesystem::path ChunkPath = Path / "chunks" / Cid.ToHexString();
+ zen::WriteFile(ChunkPath, ChunkBuffer.GetCompressed());
+ });
+
+ ZEN_INFO("dumped worker '{}' to 'file://{}'", WorkerId, Path);
+ }
+}
+
+size_t
+LocalProcessRunner::QueryCapacity()
+{
+ // Estimate how much more work we're ready to accept
+
+ RwLock::SharedLockScope _{m_RunningLock};
+
+ if (!m_AcceptNewActions)
+ {
+ return 0;
+ }
+
+ size_t RunningCount = m_RunningMap.size();
+
+ if (RunningCount >= size_t(m_MaxRunningActions))
+ {
+ return 0;
+ }
+
+ return m_MaxRunningActions - RunningCount;
+}
+
+std::vector<SubmitResult>
+LocalProcessRunner::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
+{
+ std::vector<SubmitResult> Results;
+
+ for (const Ref<RunnerAction>& Action : Actions)
+ {
+ Results.push_back(SubmitAction(Action));
+ }
+
+ return Results;
+}
+
+SubmitResult
+LocalProcessRunner::SubmitAction(Ref<RunnerAction> Action)
+{
+ // Verify whether we can accept more work
+
+ {
+ RwLock::SharedLockScope _{m_RunningLock};
+
+ if (!m_AcceptNewActions)
+ {
+ return SubmitResult{.IsAccepted = false};
+ }
+
+ if (m_RunningMap.size() >= size_t(m_MaxRunningActions))
+ {
+ return SubmitResult{.IsAccepted = false};
+ }
+ }
+
+ using namespace std::literals;
+
+ // Each enqueued action is assigned an integer index (logical sequence number),
+ // which we use as a key for tracking data structures and as an opaque id which
+ // may be used by clients to reference the scheduled action
+
+ const int32_t ActionLsn = Action->ActionLsn;
+ const CbObject& ActionObj = Action->ActionObj;
+ const IoHash ActionId = ActionObj.GetHash();
+
+ MaybeDumpAction(ActionLsn, ActionObj);
+
+ std::filesystem::path SandboxPath = CreateNewSandbox();
+
+ CbPackage WorkerPackage = Action->Worker.Descriptor;
+
+ std::filesystem::path WorkerPath = ManifestWorker(Action->Worker);
+
+ // Write out action
+
+ zen::WriteFile(SandboxPath / "build.action", ActionObj.GetBuffer().AsIoBuffer());
+
+ // Manifest inputs in sandbox
+
+ ActionObj.IterateAttachments([&](CbFieldView Field) {
+ const IoHash Cid = Field.AsHash();
+ std::filesystem::path FilePath{SandboxPath / "Inputs"sv / Cid.ToHexString()};
+ IoBuffer DataBuffer = m_ChunkResolver.FindChunkByCid(Cid);
+
+ if (!DataBuffer)
+ {
+ throw std::runtime_error(fmt::format("input CID chunk '{}' missing", Cid));
+ }
+
+ zen::WriteFile(FilePath, DataBuffer);
+ });
+
+# if ZEN_PLATFORM_WINDOWS
+ // Set up environment variables
+
+ StringBuilder<1024> EnvironmentBlock;
+
+ CbObject WorkerDescription = WorkerPackage.GetObject();
+
+ for (auto& It : WorkerDescription["environment"sv])
+ {
+ EnvironmentBlock.Append(It.AsString());
+ EnvironmentBlock.Append('\0');
+ }
+ EnvironmentBlock.Append('\0');
+ EnvironmentBlock.Append('\0');
+
+ // Execute process - this spawns the child process immediately without waiting
+ // for completion
+
+ std::string_view ExecPath = WorkerDescription["path"sv].AsString();
+ std::filesystem::path ExePath = WorkerPath / std::filesystem::path(ExecPath).make_preferred();
+
+ ExtendableWideStringBuilder<512> CommandLine;
+ CommandLine.Append(L'"');
+ CommandLine.Append(ExePath.c_str());
+ CommandLine.Append(L'"');
+ CommandLine.Append(L" -Build=build.action");
+
+ LPSECURITY_ATTRIBUTES lpProcessAttributes = nullptr;
+ LPSECURITY_ATTRIBUTES lpThreadAttributes = nullptr;
+ BOOL bInheritHandles = FALSE;
+ DWORD dwCreationFlags = 0;
+
+ STARTUPINFO StartupInfo{};
+ StartupInfo.cb = sizeof StartupInfo;
+
+ PROCESS_INFORMATION ProcessInformation{};
+
+ ZEN_DEBUG("Executing: {}", WideToUtf8(CommandLine.c_str()));
+
+ CommandLine.EnsureNulTerminated();
+
+ BOOL Success = CreateProcessW(nullptr,
+ CommandLine.Data(),
+ lpProcessAttributes,
+ lpThreadAttributes,
+ bInheritHandles,
+ dwCreationFlags,
+ (LPVOID)EnvironmentBlock.Data(), // Environment block
+ SandboxPath.c_str(), // Current directory
+ &StartupInfo,
+ /* out */ &ProcessInformation);
+
+ if (!Success)
+ {
+ // TODO: this is probably not the best way to report failure. The return
+ // object should include a failure state and context
+
+ zen::ThrowLastError("Unable to launch process" /* TODO: Add context */);
+ }
+
+ CloseHandle(ProcessInformation.hThread);
+
+ Ref<RunningAction> NewAction{new RunningAction()};
+ NewAction->Action = Action;
+ NewAction->ProcessHandle = ProcessInformation.hProcess;
+ NewAction->SandboxPath = std::move(SandboxPath);
+
+ {
+ RwLock::ExclusiveLockScope _(m_RunningLock);
+
+ m_RunningMap[ActionLsn] = std::move(NewAction);
+ }
+
+ Action->SetActionState(RunnerAction::State::Running);
+# else
+ ZEN_UNUSED(ActionId);
+
+ ZEN_NOT_IMPLEMENTED();
+
+ int ExitCode = 0;
+# endif
+
+ return SubmitResult{.IsAccepted = true};
+}
+
+size_t
+LocalProcessRunner::GetSubmittedActionCount()
+{
+ RwLock::SharedLockScope _(m_RunningLock);
+ return m_RunningMap.size();
+}
+
+std::filesystem::path
+LocalProcessRunner::ManifestWorker(const WorkerDesc& Worker)
+{
+ RwLock::SharedLockScope _(m_WorkerLock);
+
+ std::filesystem::path WorkerDir = m_WorkerPath / fmt::format("runner_{}", Worker.WorkerId);
+
+ if (!std::filesystem::exists(WorkerDir))
+ {
+ _.ReleaseNow();
+
+ RwLock::ExclusiveLockScope $(m_WorkerLock);
+
+ if (!std::filesystem::exists(WorkerDir))
+ {
+ ManifestWorker(Worker.Descriptor, WorkerDir, [](const IoHash&, CompressedBuffer&) {});
+ }
+ }
+
+ return WorkerDir;
+}
+
+void
+LocalProcessRunner::DecompressAttachmentToFile(const CbPackage& FromPackage,
+ CbObjectView FileEntry,
+ const std::filesystem::path& SandboxRootPath,
+ std::function<void(const IoHash&, CompressedBuffer&)>& ChunkReferenceCallback)
+{
+ std::string_view Name = FileEntry["name"sv].AsString();
+ const IoHash ChunkHash = FileEntry["hash"sv].AsHash();
+ const uint64_t Size = FileEntry["size"sv].AsUInt64();
+
+ CompressedBuffer Compressed;
+
+ if (const CbAttachment* Attachment = FromPackage.FindAttachment(ChunkHash))
+ {
+ Compressed = Attachment->AsCompressedBinary();
+ }
+ else
+ {
+ IoBuffer DataBuffer = m_ChunkResolver.FindChunkByCid(ChunkHash);
+
+ if (!DataBuffer)
+ {
+ throw std::runtime_error(fmt::format("worker chunk '{}' missing", ChunkHash));
+ }
+
+ uint64_t DataRawSize = 0;
+ IoHash DataRawHash;
+ Compressed = CompressedBuffer::FromCompressed(SharedBuffer{DataBuffer}, DataRawHash, DataRawSize);
+
+ if (DataRawSize != Size)
+ {
+ throw std::runtime_error(
+ fmt::format("worker chunk '{}' size: {}, action spec expected {}", ChunkHash, DataBuffer.Size(), Size));
+ }
+ }
+
+ ChunkReferenceCallback(ChunkHash, Compressed);
+
+ std::filesystem::path FilePath{SandboxRootPath / std::filesystem::path(Name).make_preferred()};
+
+ SharedBuffer Decompressed = Compressed.Decompress();
+ zen::WriteFile(FilePath, Decompressed.AsIoBuffer());
+}
+
+void
+LocalProcessRunner::ManifestWorker(const CbPackage& WorkerPackage,
+ const std::filesystem::path& SandboxPath,
+ std::function<void(const IoHash&, CompressedBuffer&)>&& ChunkReferenceCallback)
+{
+ CbObject WorkerDescription = WorkerPackage.GetObject();
+
+ // Manifest worker in Sandbox
+
+ for (auto& It : WorkerDescription["executables"sv])
+ {
+ DecompressAttachmentToFile(WorkerPackage, It.AsObjectView(), SandboxPath, ChunkReferenceCallback);
+ }
+
+ for (auto& It : WorkerDescription["dirs"sv])
+ {
+ std::string_view Name = It.AsString();
+ std::filesystem::path DirPath{SandboxPath / std::filesystem::path(Name).make_preferred()};
+ zen::CreateDirectories(DirPath);
+ }
+
+ for (auto& It : WorkerDescription["files"sv])
+ {
+ DecompressAttachmentToFile(WorkerPackage, It.AsObjectView(), SandboxPath, ChunkReferenceCallback);
+ }
+
+ WriteFile(SandboxPath / "worker.zcb", WorkerDescription.GetBuffer().AsIoBuffer());
+}
+
+CbPackage
+LocalProcessRunner::GatherActionOutputs(std::filesystem::path SandboxPath)
+{
+ std::filesystem::path OutputFile = SandboxPath / "build.output";
+ FileContents OutputData = zen::ReadFile(OutputFile);
+
+ if (OutputData.ErrorCode)
+ {
+ throw std::system_error(OutputData.ErrorCode, fmt::format("Failed to read build output file '{}'", OutputFile));
+ }
+
+ CbPackage OutputPackage;
+ CbObject Output = zen::LoadCompactBinaryObject(OutputData.Flatten());
+
+ uint64_t TotalAttachmentBytes = 0;
+ uint64_t TotalRawAttachmentBytes = 0;
+
+ Output.IterateAttachments([&](CbFieldView Field) {
+ IoHash Hash = Field.AsHash();
+ std::filesystem::path OutputPath{SandboxPath / "Outputs" / Hash.ToHexString()};
+ FileContents ChunkData = zen::ReadFile(OutputPath);
+
+ if (ChunkData.ErrorCode)
+ {
+ throw std::system_error(ChunkData.ErrorCode, fmt::format("Failed to read build output file '{}'", OutputPath));
+ }
+
+ uint64_t ChunkDataRawSize = 0;
+ IoHash ChunkDataHash;
+ CompressedBuffer AttachmentBuffer =
+ CompressedBuffer::FromCompressed(SharedBuffer(ChunkData.Flatten()), ChunkDataHash, ChunkDataRawSize);
+
+ if (!AttachmentBuffer)
+ {
+ throw std::runtime_error("Invalid output encountered (not valid CompressedBuffer format)");
+ }
+
+ TotalAttachmentBytes += AttachmentBuffer.GetCompressedSize();
+ TotalRawAttachmentBytes += ChunkDataRawSize;
+
+ CbAttachment Attachment(std::move(AttachmentBuffer), ChunkDataHash);
+ OutputPackage.AddAttachment(Attachment);
+ });
+
+ OutputPackage.SetObject(Output);
+
+ ZEN_DEBUG("Action completed with {} attachments ({} compressed, {} uncompressed)",
+ OutputPackage.GetAttachments().size(),
+ NiceBytes(TotalAttachmentBytes),
+ NiceBytes(TotalRawAttachmentBytes));
+
+ return OutputPackage;
+}
+
+void
+LocalProcessRunner::MonitorThreadFunction()
+{
+ SetCurrentThreadName("LocalProcessRunner_Monitor");
+
+ auto _ = MakeGuard([&] { ZEN_INFO("monitor thread exiting"); });
+
+ do
+ {
+ // On Windows it's possible to wait on process handles, so we wait for either a process to exit
+ // or for the monitor event to be signaled (which indicates we should check for cancellation
+ // or shutdown). This could be further improved by using a completion port and registering process
+ // handles with it, but this is a reasonable first implementation given that we shouldn't be dealing
+ // with an enormous number of concurrent processes.
+ //
+ // On other platforms we just wait on the monitor event and poll for process exits at intervals.
+# if ZEN_PLATFORM_WINDOWS
+ auto WaitOnce = [&] {
+ HANDLE WaitHandles[MAXIMUM_WAIT_OBJECTS];
+
+ uint32_t NumHandles = 0;
+
+ WaitHandles[NumHandles++] = m_MonitorThreadEvent.GetWindowsHandle();
+
+ m_RunningLock.WithSharedLock([&] {
+ for (auto It = begin(m_RunningMap), ItEnd = end(m_RunningMap); It != ItEnd && NumHandles < MAXIMUM_WAIT_OBJECTS; ++It)
+ {
+ Ref<RunningAction> Action = It->second;
+
+ WaitHandles[NumHandles++] = Action->ProcessHandle;
+ }
+ });
+
+ DWORD WaitResult = WaitForMultipleObjects(NumHandles, WaitHandles, FALSE, 1000);
+
+ // return true if a handle was signaled
+ return (WaitResult <= NumHandles);
+ };
+# else
+ auto WaitOnce = [&] { return m_MonitorThreadEvent.Wait(1000); };
+# endif
+
+ while (!WaitOnce())
+ {
+ if (m_MonitorThreadEnabled == false)
+ {
+ return;
+ }
+
+ SweepRunningActions();
+ }
+
+ // Signal received
+
+ SweepRunningActions();
+ } while (m_MonitorThreadEnabled);
+}
+
+void
+LocalProcessRunner::CancelRunningActions()
+{
+ Stopwatch Timer;
+ std::unordered_map<int, Ref<RunningAction>> RunningMap;
+
+ m_RunningLock.WithExclusiveLock([&] { std::swap(RunningMap, m_RunningMap); });
+
+ if (RunningMap.empty())
+ {
+ return;
+ }
+
+ ZEN_INFO("cancelling all running actions");
+
+ // For expedience we initiate the process termination for all known
+ // processes before attempting to wait for them to exit.
+
+ std::vector<int> TerminatedLsnList;
+
+ for (const auto& Kv : RunningMap)
+ {
+ Ref<RunningAction> Action = Kv.second;
+
+ // Terminate running process
+
+# if ZEN_PLATFORM_WINDOWS
+ BOOL Success = TerminateProcess(Action->ProcessHandle, 222);
+
+ if (Success)
+ {
+ TerminatedLsnList.push_back(Kv.first);
+ }
+ else
+ {
+ DWORD LastError = GetLastError();
+
+ if (LastError != ERROR_ACCESS_DENIED)
+ {
+ ZEN_WARN("TerminateProcess for LSN {} not successful: {}", Action->Action->ActionLsn, GetSystemErrorAsString(LastError));
+ }
+ }
+# else
+ ZEN_NOT_IMPLEMENTED("need to implement process termination");
+# endif
+ }
+
+ // We only post results for processes we have terminated, in order
+ // to avoid multiple results getting posted for the same action
+
+ for (int Lsn : TerminatedLsnList)
+ {
+ if (auto It = RunningMap.find(Lsn); It != RunningMap.end())
+ {
+ Ref<RunningAction> Running = It->second;
+
+# if ZEN_PLATFORM_WINDOWS
+ if (Running->ProcessHandle != INVALID_HANDLE_VALUE)
+ {
+ DWORD WaitResult = WaitForSingleObject(Running->ProcessHandle, 2000);
+
+ if (WaitResult != WAIT_OBJECT_0)
+ {
+ ZEN_WARN("wait for LSN {}: process exit did not succeed, result = {}", Running->Action->ActionLsn, WaitResult);
+ }
+ else
+ {
+ ZEN_DEBUG("LSN {}: process exit OK", Running->Action->ActionLsn);
+ }
+ }
+# endif
+
+ // Clean up and post error result
+
+ DeleteDirectories(Running->SandboxPath);
+ Running->Action->SetActionState(RunnerAction::State::Failed);
+ }
+ }
+
+ ZEN_INFO("DONE - cancelled {} running processes (took {})", TerminatedLsnList.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+}
+
+void
+LocalProcessRunner::SweepRunningActions()
+{
+ std::vector<Ref<RunningAction>> CompletedActions;
+
+ m_RunningLock.WithExclusiveLock([&] {
+ // TODO: It would be good to not hold the exclusive lock while making
+ // system calls and other expensive operations.
+
+ for (auto It = begin(m_RunningMap), ItEnd = end(m_RunningMap); It != ItEnd;)
+ {
+ Ref<RunningAction> Action = It->second;
+
+# if ZEN_PLATFORM_WINDOWS
+ DWORD ExitCode = 0;
+ BOOL IsSuccess = GetExitCodeProcess(Action->ProcessHandle, &ExitCode);
+
+ if (IsSuccess && ExitCode != STILL_ACTIVE)
+ {
+ CloseHandle(Action->ProcessHandle);
+ Action->ProcessHandle = INVALID_HANDLE_VALUE;
+
+ CompletedActions.push_back(std::move(Action));
+ It = m_RunningMap.erase(It);
+ }
+ else
+ {
+ ++It;
+ }
+# else
+ // TODO: implement properly for Mac/Linux
+
+ ZEN_UNUSED(Action);
+# endif
+ }
+ });
+
+ // Notify outer. Note that this has to be done without holding any local locks
+ // otherwise we may end up with deadlocks.
+
+ for (Ref<RunningAction> Running : CompletedActions)
+ {
+ const int ActionLsn = Running->Action->ActionLsn;
+
+ if (Running->ExitCode == 0)
+ {
+ try
+ {
+ // Gather outputs
+
+ CbPackage OutputPackage = GatherActionOutputs(Running->SandboxPath);
+
+ Running->Action->SetResult(std::move(OutputPackage));
+ Running->Action->SetActionState(RunnerAction::State::Completed);
+
+ // We can delete the files at this point
+ if (!DeleteDirectories(Running->SandboxPath))
+ {
+ ZEN_WARN("Unable to delete directory '{}', this will continue to exist until service restart", Running->SandboxPath);
+ }
+
+ // Success -- continue with next iteration of the loop
+ continue;
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_ERROR("Encountered failure while gathering outputs for action lsn {}, '{}'", ActionLsn, Ex.what());
+ }
+ }
+
+ // Failed - for now this is indicated with an empty package in
+ // the results map. We can clean out the sandbox directory immediately.
+
+ std::error_code Ec;
+ DeleteDirectories(Running->SandboxPath, Ec);
+
+ if (Ec)
+ {
+ ZEN_WARN("Unable to delete sandbox directory '{}': {}", Running->SandboxPath, Ec.message());
+ }
+
+ Running->Action->SetActionState(RunnerAction::State::Failed);
+ }
+}
+
+} // namespace zen::compute
+
+#endif
diff --git a/src/zencompute/localrunner.h b/src/zencompute/localrunner.h
new file mode 100644
index 000000000..35f464805
--- /dev/null
+++ b/src/zencompute/localrunner.h
@@ -0,0 +1,100 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "zencompute/functionservice.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include "functionrunner.h"
+
+# include <zencore/thread.h>
+# include <zencore/zencore.h>
+# include <zenstore/cidstore.h>
+# include <zencore/compactbinarypackage.h>
+# include <zencore/logging.h>
+
+# include <atomic>
+# include <filesystem>
+# include <thread>
+
+namespace zen {
+class CbPackage;
+}
+
+namespace zen::compute {
+
+/** Direct process spawner
+
+ This runner simply sets up a directory structure for each job and
+ creates a process to perform the computation in it. It is not very
+ efficient and is intended mostly for testing.
+
+ */
+
+class LocalProcessRunner : public FunctionRunner
+{
+ LocalProcessRunner(LocalProcessRunner&&) = delete;
+ LocalProcessRunner& operator=(LocalProcessRunner&&) = delete;
+
+public:
+ LocalProcessRunner(ChunkResolver& Resolver, const std::filesystem::path& BaseDir);
+ ~LocalProcessRunner();
+
+ virtual void Shutdown() override;
+ virtual void RegisterWorker(const CbPackage& WorkerPackage) override;
+ [[nodiscard]] virtual SubmitResult SubmitAction(Ref<RunnerAction> Action) override;
+ [[nodiscard]] virtual bool IsHealthy() override { return true; }
+ [[nodiscard]] virtual size_t GetSubmittedActionCount() override;
+ [[nodiscard]] virtual size_t QueryCapacity() override;
+ [[nodiscard]] virtual std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) override;
+
+protected:
+ LoggerRef Log() { return m_Log; }
+
+ LoggerRef m_Log;
+
+ struct RunningAction : public RefCounted
+ {
+ Ref<RunnerAction> Action;
+ void* ProcessHandle = nullptr;
+ int ExitCode = 0;
+ std::filesystem::path SandboxPath;
+ };
+
+ std::atomic_bool m_AcceptNewActions;
+ ChunkResolver& m_ChunkResolver;
+ RwLock m_WorkerLock;
+ std::filesystem::path m_WorkerPath;
+ std::atomic<int32_t> m_SandboxCounter = 0;
+ std::filesystem::path m_SandboxPath;
+ int32_t m_MaxRunningActions = 64; // arbitrary limit for testing
+
+ // if used in conjuction with m_ResultsLock, this lock must be taken *after*
+ // m_ResultsLock to avoid deadlocks
+ RwLock m_RunningLock;
+ std::unordered_map<int, Ref<RunningAction>> m_RunningMap;
+
+ std::thread m_MonitorThread;
+ std::atomic<bool> m_MonitorThreadEnabled{true};
+ Event m_MonitorThreadEvent;
+ void MonitorThreadFunction();
+ void SweepRunningActions();
+ void CancelRunningActions();
+
+ std::filesystem::path CreateNewSandbox();
+ void ManifestWorker(const CbPackage& WorkerPackage,
+ const std::filesystem::path& SandboxPath,
+ std::function<void(const IoHash&, CompressedBuffer&)>&& ChunkReferenceCallback);
+ std::filesystem::path ManifestWorker(const WorkerDesc& Worker);
+ CbPackage GatherActionOutputs(std::filesystem::path SandboxPath);
+
+ void DecompressAttachmentToFile(const CbPackage& FromPackage,
+ CbObjectView FileEntry,
+ const std::filesystem::path& SandboxRootPath,
+ std::function<void(const IoHash&, CompressedBuffer&)>& ChunkReferenceCallback);
+};
+
+} // namespace zen::compute
+
+#endif
diff --git a/src/zencompute/recordingreader.cpp b/src/zencompute/recordingreader.cpp
new file mode 100644
index 000000000..1c1a119cf
--- /dev/null
+++ b/src/zencompute/recordingreader.cpp
@@ -0,0 +1,335 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "zencompute/recordingreader.h"
+
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinaryfile.h>
+#include <zencore/compactbinaryvalue.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+
+#if ZEN_PLATFORM_WINDOWS
+# include <ppl.h>
+# define ZEN_CONCRT_AVAILABLE 1
+#else
+# define ZEN_CONCRT_AVAILABLE 0
+#endif
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+namespace zen::compute {
+
+using namespace std::literals;
+
+//////////////////////////////////////////////////////////////////////////
+
+# if ZEN_PLATFORM_WINDOWS
+# define ZEN_BUILD_ACTION L"Build.action"
+# define ZEN_WORKER_UCB L"worker.ucb"
+# else
+# define ZEN_BUILD_ACTION "Build.action"
+# define ZEN_WORKER_UCB "worker.ucb"
+# endif
+
+//////////////////////////////////////////////////////////////////////////
+
+struct RecordingTreeVisitor : public FileSystemTraversal::TreeVisitor
+{
+ virtual void VisitFile(const std::filesystem::path& Parent,
+ const path_view& File,
+ uint64_t FileSize,
+ uint32_t NativeModeOrAttributes,
+ uint64_t NativeModificationTick)
+ {
+ ZEN_UNUSED(Parent, File, FileSize, NativeModeOrAttributes, NativeModificationTick);
+
+ if (File.compare(path_view(ZEN_BUILD_ACTION)) == 0)
+ {
+ WorkDirs.push_back(Parent);
+ }
+ else if (File.compare(path_view(ZEN_WORKER_UCB)) == 0)
+ {
+ WorkerDirs.push_back(Parent);
+ }
+ }
+
+ virtual bool VisitDirectory(const std::filesystem::path& Parent, const path_view& DirectoryName, uint32_t NativeModeOrAttributes)
+ {
+ ZEN_UNUSED(Parent, DirectoryName, NativeModeOrAttributes);
+
+ return true;
+ }
+
+ std::vector<std::filesystem::path> WorkerDirs;
+ std::vector<std::filesystem::path> WorkDirs;
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+void
+IterateOverArray(auto Array, auto Func, int TargetParallelism)
+{
+# if ZEN_CONCRT_AVAILABLE
+ if (TargetParallelism > 1)
+ {
+ concurrency::simple_partitioner Chunker(Array.size() / TargetParallelism);
+ concurrency::parallel_for_each(begin(Array), end(Array), [&](const auto& Item) { Func(Item); });
+
+ return;
+ }
+# else
+ ZEN_UNUSED(TargetParallelism);
+# endif
+
+ for (const auto& Item : Array)
+ {
+ Func(Item);
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+RecordingReaderBase::~RecordingReaderBase() = default;
+
+//////////////////////////////////////////////////////////////////////////
+
+RecordingReader::RecordingReader(const std::filesystem::path& RecordingPath) : m_RecordingLogDir(RecordingPath)
+{
+ CidStoreConfiguration CidConfig;
+ CidConfig.RootDirectory = m_RecordingLogDir / "cid";
+ CidConfig.HugeValueThreshold = 128 * 1024 * 1024;
+
+ m_CidStore.Initialize(CidConfig);
+}
+
+RecordingReader::~RecordingReader()
+{
+ m_CidStore.Flush();
+}
+
+size_t
+RecordingReader::GetActionCount() const
+{
+ return m_Actions.size();
+}
+
+IoBuffer
+RecordingReader::FindChunkByCid(const IoHash& DecompressedId)
+{
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(DecompressedId))
+ {
+ return Chunk;
+ }
+
+ ZEN_ERROR("failed lookup of chunk with CID '{}'", DecompressedId);
+
+ return {};
+}
+
+std::unordered_map<zen::IoHash, zen::CbPackage>
+RecordingReader::ReadWorkers()
+{
+ std::unordered_map<zen::IoHash, zen::CbPackage> WorkerMap;
+
+ {
+ CbObjectFromFile TocFile = LoadCompactBinaryObject(m_RecordingLogDir / "workers.ztoc");
+ CbObject Toc = TocFile.Object;
+
+ m_WorkerDataFile.Open(m_RecordingLogDir / "workers.zdat", BasicFile::Mode::kRead);
+
+ ZEN_ASSERT(Toc["version"sv].AsInt32() == 1);
+
+ for (auto& It : Toc["toc"])
+ {
+ CbArrayView Entry = It.AsArrayView();
+ CbFieldViewIterator Vit = Entry.CreateViewIterator();
+
+ const IoHash WorkerId = Vit++->AsHash();
+ const uint64_t Offset = Vit++->AsInt64(0);
+ const uint64_t Size = Vit++->AsInt64(0);
+
+ IoBuffer WorkerRange = m_WorkerDataFile.ReadRange(Offset, Size);
+ CbObject WorkerDesc = LoadCompactBinaryObject(WorkerRange);
+ CbPackage& WorkerPkg = WorkerMap[WorkerId];
+ WorkerPkg.SetObject(WorkerDesc);
+
+ WorkerDesc.IterateAttachments([&](const zen::CbFieldView AttachmentField) {
+ const IoHash AttachmentCid = AttachmentField.GetValue().AsHash();
+ IoBuffer AttachmentData = m_CidStore.FindChunkByCid(AttachmentCid);
+
+ if (AttachmentData)
+ {
+ IoHash RawHash;
+ uint64_t RawSize = 0;
+ CompressedBuffer CompressedData = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentData), RawHash, RawSize);
+ WorkerPkg.AddAttachment(CbAttachment(CompressedData, RawHash));
+ }
+ });
+ }
+ }
+
+ // Scan actions as well (this should be called separately, ideally)
+
+ ScanActions();
+
+ return WorkerMap;
+}
+
+void
+RecordingReader::ScanActions()
+{
+ CbObjectFromFile TocFile = LoadCompactBinaryObject(m_RecordingLogDir / "actions.ztoc");
+ CbObject Toc = TocFile.Object;
+
+ m_ActionDataFile.Open(m_RecordingLogDir / "actions.zdat", BasicFile::Mode::kRead);
+
+ ZEN_ASSERT(Toc["version"sv].AsInt32() == 1);
+
+ for (auto& It : Toc["toc"])
+ {
+ CbArrayView ArrayEntry = It.AsArrayView();
+ CbFieldViewIterator Vit = ArrayEntry.CreateViewIterator();
+
+ ActionEntry Entry;
+ Entry.ActionId = Vit++->AsHash();
+ Entry.Offset = Vit++->AsInt64(0);
+ Entry.Size = Vit++->AsInt64(0);
+
+ m_Actions.push_back(Entry);
+ }
+}
+
+void
+RecordingReader::IterateActions(std::function<void(CbObject ActionObject, const IoHash& ActionId)>&& Callback, int TargetParallelism)
+{
+ IterateOverArray(
+ m_Actions,
+ [&](const ActionEntry& Entry) {
+ CbObject ActionDesc = LoadCompactBinaryObject(m_ActionDataFile.ReadRange(Entry.Offset, Entry.Size));
+
+ Callback(ActionDesc, Entry.ActionId);
+ },
+ TargetParallelism);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+IoBuffer
+LocalResolver::FindChunkByCid(const IoHash& DecompressedId)
+{
+ RwLock::SharedLockScope _(MapLock);
+ if (auto It = Attachments.find(DecompressedId); It != Attachments.end())
+ {
+ return It->second;
+ }
+
+ return {};
+}
+
+void
+LocalResolver::Add(const IoHash& Cid, IoBuffer Data)
+{
+ RwLock::ExclusiveLockScope _(MapLock);
+ Data.SetContentType(ZenContentType::kCompressedBinary);
+ Attachments[Cid] = Data;
+}
+
+///
+
+UeRecordingReader::UeRecordingReader(const std::filesystem::path& RecordingPath) : m_RecordingDir(RecordingPath)
+{
+}
+
+UeRecordingReader::~UeRecordingReader()
+{
+}
+
+size_t
+UeRecordingReader::GetActionCount() const
+{
+ return m_WorkDirs.size();
+}
+
+IoBuffer
+UeRecordingReader::FindChunkByCid(const IoHash& DecompressedId)
+{
+ return m_LocalResolver.FindChunkByCid(DecompressedId);
+}
+
+std::unordered_map<zen::IoHash, zen::CbPackage>
+UeRecordingReader::ReadWorkers()
+{
+ std::unordered_map<zen::IoHash, zen::CbPackage> WorkerMap;
+
+ FileSystemTraversal Traversal;
+ RecordingTreeVisitor Visitor;
+ Traversal.TraverseFileSystem(m_RecordingDir, Visitor);
+
+ m_WorkDirs = std::move(Visitor.WorkDirs);
+
+ for (const std::filesystem::path& WorkerDir : Visitor.WorkerDirs)
+ {
+ CbObjectFromFile WorkerFile = LoadCompactBinaryObject(WorkerDir / "worker.ucb");
+ CbObject WorkerDesc = WorkerFile.Object;
+ const IoHash& WorkerId = WorkerFile.Hash;
+ CbPackage& WorkerPkg = WorkerMap[WorkerId];
+ WorkerPkg.SetObject(WorkerDesc);
+
+ WorkerDesc.IterateAttachments([&](const zen::CbFieldView AttachmentField) {
+ const IoHash AttachmentCid = AttachmentField.GetValue().AsHash();
+ IoBuffer AttachmentData = ReadFile(WorkerDir / "chunks" / AttachmentCid.ToHexString()).Flatten();
+ IoHash RawHash;
+ uint64_t RawSize = 0;
+ CompressedBuffer CompressedData = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentData), RawHash, RawSize);
+ WorkerPkg.AddAttachment(CbAttachment(CompressedData, RawHash));
+ });
+ }
+
+ return WorkerMap;
+}
+
+void
+UeRecordingReader::IterateActions(std::function<void(CbObject ActionObject, const IoHash& ActionId)>&& Callback, int ParallelismTarget)
+{
+ IterateOverArray(
+ m_WorkDirs,
+ [&](const std::filesystem::path& WorkDir) {
+ CbPackage WorkPackage = ReadAction(WorkDir);
+ CbObject ActionObject = WorkPackage.GetObject();
+ const IoHash& ActionId = WorkPackage.GetObjectHash();
+
+ Callback(ActionObject, ActionId);
+ },
+ ParallelismTarget);
+}
+
+CbPackage
+UeRecordingReader::ReadAction(std::filesystem::path WorkDir)
+{
+ CbPackage WorkPackage;
+ std::filesystem::path WorkDescPath = WorkDir / "Build.action";
+ CbObjectFromFile ActionFile = LoadCompactBinaryObject(WorkDescPath);
+ CbObject& ActionObject = ActionFile.Object;
+
+ WorkPackage.SetObject(ActionObject);
+
+ ActionObject.IterateAttachments([&](const zen::CbFieldView AttachmentField) {
+ const IoHash AttachmentCid = AttachmentField.GetValue().AsHash();
+ IoBuffer AttachmentData = ReadFile(WorkDir / "inputs" / AttachmentCid.ToHexString()).Flatten();
+
+ m_LocalResolver.Add(AttachmentCid, AttachmentData);
+
+ IoHash RawHash;
+ uint64_t RawSize = 0;
+ CompressedBuffer CompressedData = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentData), RawHash, RawSize);
+ ZEN_ASSERT(AttachmentCid == RawHash);
+ WorkPackage.AddAttachment(CbAttachment(CompressedData, RawHash));
+ });
+
+ return WorkPackage;
+}
+
+} // namespace zen::compute
+
+#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/src/zencompute/remotehttprunner.cpp b/src/zencompute/remotehttprunner.cpp
new file mode 100644
index 000000000..98ced5fe8
--- /dev/null
+++ b/src/zencompute/remotehttprunner.cpp
@@ -0,0 +1,457 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "remotehttprunner.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include <zencore/compactbinary.h>
+# include <zencore/compactbinarybuilder.h>
+# include <zencore/compactbinarypackage.h>
+# include <zencore/compress.h>
+# include <zencore/except.h>
+# include <zencore/filesystem.h>
+# include <zencore/fmtutils.h>
+# include <zencore/iobuffer.h>
+# include <zencore/iohash.h>
+# include <zencore/scopeguard.h>
+# include <zenhttp/httpcommon.h>
+# include <zenstore/cidstore.h>
+
+# include <span>
+
+//////////////////////////////////////////////////////////////////////////
+
+namespace zen::compute {
+
+using namespace std::literals;
+
+//////////////////////////////////////////////////////////////////////////
+
+RemoteHttpRunner::RemoteHttpRunner(ChunkResolver& InChunkResolver, const std::filesystem::path& BaseDir, std::string_view HostName)
+: FunctionRunner(BaseDir)
+, m_Log(logging::Get("http_exec"))
+, m_ChunkResolver{InChunkResolver}
+, m_BaseUrl{fmt::format("{}/apply", HostName)}
+, m_Http(m_BaseUrl)
+{
+ m_MonitorThread = std::thread{&RemoteHttpRunner::MonitorThreadFunction, this};
+}
+
+RemoteHttpRunner::~RemoteHttpRunner()
+{
+ Shutdown();
+}
+
+void
+RemoteHttpRunner::Shutdown()
+{
+ // TODO: should cleanly drain/cancel pending work
+
+ m_MonitorThreadEnabled = false;
+ m_MonitorThreadEvent.Set();
+ if (m_MonitorThread.joinable())
+ {
+ m_MonitorThread.join();
+ }
+}
+
+void
+RemoteHttpRunner::RegisterWorker(const CbPackage& WorkerPackage)
+{
+ const IoHash WorkerId = WorkerPackage.GetObjectHash();
+ CbPackage WorkerDesc = WorkerPackage;
+
+ std::string WorkerUrl = fmt::format("/workers/{}", WorkerId);
+
+ HttpClient::Response WorkerResponse = m_Http.Get(WorkerUrl);
+
+ if (WorkerResponse.StatusCode == HttpResponseCode::NotFound)
+ {
+ HttpClient::Response DescResponse = m_Http.Post(WorkerUrl, WorkerDesc.GetObject());
+
+ if (DescResponse.StatusCode == HttpResponseCode::NotFound)
+ {
+ CbPackage Pkg = WorkerDesc;
+
+ // Build response package by sending only the attachments
+ // the other end needs. We start with the full package and
+ // remove the attachments which are not needed.
+
+ {
+ std::unordered_set<IoHash> Needed;
+
+ CbObject Response = DescResponse.AsObject();
+
+ for (auto& Item : Response["need"sv])
+ {
+ const IoHash NeedHash = Item.AsHash();
+
+ Needed.insert(NeedHash);
+ }
+
+ std::unordered_set<IoHash> ToRemove;
+
+ for (const CbAttachment& Attachment : Pkg.GetAttachments())
+ {
+ const IoHash& Hash = Attachment.GetHash();
+
+ if (Needed.find(Hash) == Needed.end())
+ {
+ ToRemove.insert(Hash);
+ }
+ }
+
+ for (const IoHash& Hash : ToRemove)
+ {
+ int RemovedCount = Pkg.RemoveAttachment(Hash);
+
+ ZEN_ASSERT(RemovedCount == 1);
+ }
+ }
+
+ // Post resulting package
+
+ HttpClient::Response PayloadResponse = m_Http.Post(WorkerUrl, Pkg);
+
+ if (!IsHttpSuccessCode(PayloadResponse.StatusCode))
+ {
+ ZEN_ERROR("ERROR: unable to register payloads for worker {} at {}{}", WorkerId, m_Http.GetBaseUri(), WorkerUrl);
+
+ // TODO: propagate error
+ }
+ }
+ else if (!IsHttpSuccessCode(DescResponse.StatusCode))
+ {
+ ZEN_ERROR("ERROR: unable to register worker {} at {}{}", WorkerId, m_Http.GetBaseUri(), WorkerUrl);
+
+ // TODO: propagate error
+ }
+ else
+ {
+ ZEN_ASSERT(DescResponse.StatusCode == HttpResponseCode::NoContent);
+ }
+ }
+ else if (WorkerResponse.StatusCode == HttpResponseCode::OK)
+ {
+ // Already known from a previous run
+ }
+ else if (!IsHttpSuccessCode(WorkerResponse.StatusCode))
+ {
+ ZEN_ERROR("ERROR: unable to look up worker {} at {}{} (error: {} {})",
+ WorkerId,
+ m_Http.GetBaseUri(),
+ WorkerUrl,
+ (int)WorkerResponse.StatusCode,
+ ToString(WorkerResponse.StatusCode));
+
+ // TODO: propagate error
+ }
+}
+
+size_t
+RemoteHttpRunner::QueryCapacity()
+{
+ // Estimate how much more work we're ready to accept
+
+ RwLock::SharedLockScope _{m_RunningLock};
+
+ size_t RunningCount = m_RemoteRunningMap.size();
+
+ if (RunningCount >= size_t(m_MaxRunningActions))
+ {
+ return 0;
+ }
+
+ return m_MaxRunningActions - RunningCount;
+}
+
+std::vector<SubmitResult>
+RemoteHttpRunner::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
+{
+ std::vector<SubmitResult> Results;
+
+ for (const Ref<RunnerAction>& Action : Actions)
+ {
+ Results.push_back(SubmitAction(Action));
+ }
+
+ return Results;
+}
+
+SubmitResult
+RemoteHttpRunner::SubmitAction(Ref<RunnerAction> Action)
+{
+ // Verify whether we can accept more work
+
+ {
+ RwLock::SharedLockScope _{m_RunningLock};
+ if (m_RemoteRunningMap.size() >= size_t(m_MaxRunningActions))
+ {
+ return SubmitResult{.IsAccepted = false};
+ }
+ }
+
+ using namespace std::literals;
+
+ // Each enqueued action is assigned an integer index (logical sequence number),
+ // which we use as a key for tracking data structures and as an opaque id which
+ // may be used by clients to reference the scheduled action
+
+ const int32_t ActionLsn = Action->ActionLsn;
+ const CbObject& ActionObj = Action->ActionObj;
+ const IoHash ActionId = ActionObj.GetHash();
+
+ MaybeDumpAction(ActionLsn, ActionObj);
+
+ // Enqueue job
+
+ CbObject Result;
+
+ HttpClient::Response WorkResponse = m_Http.Post("/jobs", ActionObj);
+ HttpResponseCode WorkResponseCode = WorkResponse.StatusCode;
+
+ if (WorkResponseCode == HttpResponseCode::OK)
+ {
+ Result = WorkResponse.AsObject();
+ }
+ else if (WorkResponseCode == HttpResponseCode::NotFound)
+ {
+ // Not all attachments are present
+
+ // Build response package including all required attachments
+
+ CbPackage Pkg;
+ Pkg.SetObject(ActionObj);
+
+ CbObject Response = WorkResponse.AsObject();
+
+ for (auto& Item : Response["need"sv])
+ {
+ const IoHash NeedHash = Item.AsHash();
+
+ if (IoBuffer Chunk = m_ChunkResolver.FindChunkByCid(NeedHash))
+ {
+ uint64_t DataRawSize = 0;
+ IoHash DataRawHash;
+ CompressedBuffer Compressed =
+ CompressedBuffer::FromCompressed(SharedBuffer{Chunk}, /* out */ DataRawHash, /* out */ DataRawSize);
+
+ ZEN_ASSERT(DataRawHash == NeedHash);
+
+ Pkg.AddAttachment(CbAttachment(Compressed, NeedHash));
+ }
+ else
+ {
+ // No such attachment
+
+ return {.IsAccepted = false, .Reason = fmt::format("missing attachment {}", NeedHash)};
+ }
+ }
+
+ // Post resulting package
+
+ HttpClient::Response PayloadResponse = m_Http.Post("/jobs", Pkg);
+
+ if (!PayloadResponse)
+ {
+ ZEN_WARN("unable to register payloads for action {} at {}/jobs", ActionId, m_Http.GetBaseUri());
+
+ // TODO: include more information about the failure in the response
+
+ return {.IsAccepted = false, .Reason = "HTTP request failed"};
+ }
+ else if (PayloadResponse.StatusCode == HttpResponseCode::OK)
+ {
+ Result = PayloadResponse.AsObject();
+ }
+ else
+ {
+ // Unexpected response
+
+ const int ResponseStatusCode = (int)PayloadResponse.StatusCode;
+
+ ZEN_WARN("unable to register payloads for action {} at {}/jobs (error: {} {})",
+ ActionId,
+ m_Http.GetBaseUri(),
+ ResponseStatusCode,
+ ToString(ResponseStatusCode));
+
+ return {.IsAccepted = false,
+ .Reason = fmt::format("unexpected response code {} {} from {}/jobs",
+ ResponseStatusCode,
+ ToString(ResponseStatusCode),
+ m_Http.GetBaseUri())};
+ }
+ }
+
+ if (Result)
+ {
+ if (const int32_t LsnField = Result["lsn"].AsInt32(0))
+ {
+ HttpRunningAction NewAction;
+ NewAction.Action = Action;
+ NewAction.RemoteActionLsn = LsnField;
+
+ {
+ RwLock::ExclusiveLockScope _(m_RunningLock);
+
+ m_RemoteRunningMap[LsnField] = std::move(NewAction);
+ }
+
+ ZEN_DEBUG("scheduled action {} with remote LSN {} (local LSN {})", ActionId, LsnField, ActionLsn);
+
+ Action->SetActionState(RunnerAction::State::Running);
+
+ return SubmitResult{.IsAccepted = true};
+ }
+ }
+
+ return {};
+}
+
+bool
+RemoteHttpRunner::IsHealthy()
+{
+ if (HttpClient::Response Ready = m_Http.Get("/ready"))
+ {
+ return true;
+ }
+ else
+ {
+ // TODO: use response to propagate context
+ return false;
+ }
+}
+
+size_t
+RemoteHttpRunner::GetSubmittedActionCount()
+{
+ RwLock::SharedLockScope _(m_RunningLock);
+ return m_RemoteRunningMap.size();
+}
+
+void
+RemoteHttpRunner::MonitorThreadFunction()
+{
+ SetCurrentThreadName("RemoteHttpRunner_Monitor");
+
+ do
+ {
+ const int NormalWaitingTime = 1000;
+ int WaitTimeMs = NormalWaitingTime;
+ auto WaitOnce = [&] { return m_MonitorThreadEvent.Wait(WaitTimeMs); };
+ auto SweepOnce = [&] {
+ const size_t RetiredCount = SweepRunningActions();
+
+ m_RunningLock.WithSharedLock([&] {
+ if (m_RemoteRunningMap.size() > 16)
+ {
+ WaitTimeMs = NormalWaitingTime / 4;
+ }
+ else
+ {
+ if (RetiredCount)
+ {
+ WaitTimeMs = NormalWaitingTime / 2;
+ }
+ else
+ {
+ WaitTimeMs = NormalWaitingTime;
+ }
+ }
+ });
+ };
+
+ while (!WaitOnce())
+ {
+ SweepOnce();
+ }
+
+ // Signal received - this may mean we should quit
+
+ SweepOnce();
+ } while (m_MonitorThreadEnabled);
+}
+
+size_t
+RemoteHttpRunner::SweepRunningActions()
+{
+ std::vector<HttpRunningAction> CompletedActions;
+
+ // Poll remote for list of completed actions
+
+ HttpClient::Response ResponseCompleted = m_Http.Get("/jobs/completed"sv);
+
+ if (CbObject Completed = ResponseCompleted.AsObject())
+ {
+ for (auto& FieldIt : Completed["completed"sv])
+ {
+ const int32_t CompleteLsn = FieldIt.AsInt32();
+
+ if (HttpClient::Response ResponseJob = m_Http.Get(fmt::format("/jobs/{}"sv, CompleteLsn)))
+ {
+ m_RunningLock.WithExclusiveLock([&] {
+ if (auto CompleteIt = m_RemoteRunningMap.find(CompleteLsn); CompleteIt != m_RemoteRunningMap.end())
+ {
+ HttpRunningAction CompletedAction = std::move(CompleteIt->second);
+ CompletedAction.ActionResults = ResponseJob.AsPackage();
+ CompletedAction.Success = true;
+
+ CompletedActions.push_back(std::move(CompletedAction));
+ m_RemoteRunningMap.erase(CompleteIt);
+ }
+ else
+ {
+ // we received a completion notice for an action we don't know about,
+ // this can happen if the runner is used by multiple upstream schedulers,
+ // or if this compute node was recently restarted and lost track of
+ // previously scheduled actions
+ }
+ });
+ }
+ }
+
+ if (CbObjectView Metrics = Completed["metrics"sv].AsObjectView())
+ {
+ // if (const size_t CpuCount = Metrics["core_count"].AsInt32(0))
+ if (const int32_t CpuCount = Metrics["lp_count"].AsInt32(0))
+ {
+ const int32_t NewCap = zen::Max(4, CpuCount);
+
+ if (m_MaxRunningActions > NewCap)
+ {
+ ZEN_DEBUG("capping {} to {} actions (was {})", m_BaseUrl, NewCap, m_MaxRunningActions);
+
+ m_MaxRunningActions = NewCap;
+ }
+ }
+ }
+ }
+
+ // Notify outer. Note that this has to be done without holding any local locks
+ // otherwise we may end up with deadlocks.
+
+ for (HttpRunningAction& HttpAction : CompletedActions)
+ {
+ const int ActionLsn = HttpAction.Action->ActionLsn;
+
+ if (HttpAction.Success)
+ {
+ ZEN_DEBUG("completed: {} LSN {} (remote LSN {})", HttpAction.Action->ActionId, ActionLsn, HttpAction.RemoteActionLsn);
+
+ HttpAction.Action->SetActionState(RunnerAction::State::Completed);
+
+ HttpAction.Action->SetResult(std::move(HttpAction.ActionResults));
+ }
+ else
+ {
+ HttpAction.Action->SetActionState(RunnerAction::State::Failed);
+ }
+ }
+
+ return CompletedActions.size();
+}
+
+} // namespace zen::compute
+
+#endif
diff --git a/src/zencompute/remotehttprunner.h b/src/zencompute/remotehttprunner.h
new file mode 100644
index 000000000..1e885da3d
--- /dev/null
+++ b/src/zencompute/remotehttprunner.h
@@ -0,0 +1,80 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "zencompute/functionservice.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include "functionrunner.h"
+
+# include <zencore/compactbinarypackage.h>
+# include <zencore/logging.h>
+# include <zencore/zencore.h>
+# include <zenhttp/httpclient.h>
+
+# include <atomic>
+# include <filesystem>
+# include <thread>
+
+namespace zen {
+class CidStore;
+}
+
+namespace zen::compute {
+
+/** HTTP-based runner
+
+ This implements a DDC remote compute execution strategy via REST API
+
+ */
+
+class RemoteHttpRunner : public FunctionRunner
+{
+ RemoteHttpRunner(RemoteHttpRunner&&) = delete;
+ RemoteHttpRunner& operator=(RemoteHttpRunner&&) = delete;
+
+public:
+ RemoteHttpRunner(ChunkResolver& InChunkResolver, const std::filesystem::path& BaseDir, std::string_view HostName);
+ ~RemoteHttpRunner();
+
+ virtual void Shutdown() override;
+ virtual void RegisterWorker(const CbPackage& WorkerPackage) override;
+ [[nodiscard]] virtual SubmitResult SubmitAction(Ref<RunnerAction> Action) override;
+ [[nodiscard]] virtual bool IsHealthy() override;
+ [[nodiscard]] virtual size_t GetSubmittedActionCount() override;
+ [[nodiscard]] virtual size_t QueryCapacity() override;
+ [[nodiscard]] virtual std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) override;
+
+protected:
+ LoggerRef Log() { return m_Log; }
+
+private:
+ LoggerRef m_Log;
+ ChunkResolver& m_ChunkResolver;
+ std::string m_BaseUrl;
+ HttpClient m_Http;
+
+ int32_t m_MaxRunningActions = 256; // arbitrary limit for testing
+
+ struct HttpRunningAction
+ {
+ Ref<RunnerAction> Action;
+ int RemoteActionLsn = 0; // Remote LSN
+ bool Success = false;
+ CbPackage ActionResults;
+ };
+
+ RwLock m_RunningLock;
+ std::unordered_map<int, HttpRunningAction> m_RemoteRunningMap; // Note that this is keyed on the *REMOTE* lsn
+
+ std::thread m_MonitorThread;
+ std::atomic<bool> m_MonitorThreadEnabled{true};
+ Event m_MonitorThreadEvent;
+ void MonitorThreadFunction();
+ size_t SweepRunningActions();
+};
+
+} // namespace zen::compute
+
+#endif
diff --git a/src/zencompute/xmake.lua b/src/zencompute/xmake.lua
new file mode 100644
index 000000000..c710b662d
--- /dev/null
+++ b/src/zencompute/xmake.lua
@@ -0,0 +1,11 @@
+-- Copyright Epic Games, Inc. All Rights Reserved.
+
+target('zencompute')
+ set_kind("static")
+ set_group("libs")
+ add_headerfiles("**.h")
+ add_files("**.cpp")
+ add_includedirs("include", {public=true})
+ add_deps("zencore", "zenstore", "zenutil", "zennet", "zenhttp")
+ add_packages("vcpkg::gsl-lite")
+ add_packages("vcpkg::spdlog", "vcpkg::cxxopts")
diff --git a/src/zencompute/zencompute.cpp b/src/zencompute/zencompute.cpp
new file mode 100644
index 000000000..633250f4e
--- /dev/null
+++ b/src/zencompute/zencompute.cpp
@@ -0,0 +1,12 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "zencompute/zencompute.h"
+
+namespace zen {
+
+void
+zencompute_forcelinktests()
+{
+}
+
+} // namespace zen
diff --git a/src/zencore/include/zencore/system.h b/src/zencore/include/zencore/system.h
index aec2e0ce4..bf3c15d3d 100644
--- a/src/zencore/include/zencore/system.h
+++ b/src/zencore/include/zencore/system.h
@@ -25,6 +25,7 @@ struct SystemMetrics
uint64_t AvailVirtualMemoryMiB = 0;
uint64_t PageFileMiB = 0;
uint64_t AvailPageFileMiB = 0;
+ float CpuUsagePercent = 0.0f;
};
SystemMetrics GetSystemMetrics();
diff --git a/src/zencore/system.cpp b/src/zencore/system.cpp
index e92691781..267c87e12 100644
--- a/src/zencore/system.cpp
+++ b/src/zencore/system.cpp
@@ -13,6 +13,8 @@
ZEN_THIRD_PARTY_INCLUDES_START
# include <iphlpapi.h>
# include <winsock2.h>
+# include <pdh.h>
+# pragma comment(lib, "pdh.lib")
ZEN_THIRD_PARTY_INCLUDES_END
#elif ZEN_PLATFORM_LINUX
# include <sys/utsname.h>
@@ -65,55 +67,98 @@ GetSystemMetrics()
// Determine physical core count
- DWORD BufferSize = 0;
- BOOL Result = GetLogicalProcessorInformationEx(RelationAll, nullptr, &BufferSize);
- if (int32_t Error = GetLastError(); Error != ERROR_INSUFFICIENT_BUFFER)
{
- ThrowSystemError(Error, "Failed to get buffer size for logical processor information");
- }
-
- PSYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX Buffer = (PSYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX)Memory::Alloc(BufferSize);
+ DWORD BufferSize = 0;
+ BOOL Result = GetLogicalProcessorInformationEx(RelationAll, nullptr, &BufferSize);
+ if (int32_t Error = GetLastError(); Error != ERROR_INSUFFICIENT_BUFFER)
+ {
+ ThrowSystemError(Error, "Failed to get buffer size for logical processor information");
+ }
- Result = GetLogicalProcessorInformationEx(RelationAll, Buffer, &BufferSize);
- if (!Result)
- {
- Memory::Free(Buffer);
- throw std::runtime_error("Failed to get logical processor information");
- }
+ PSYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX Buffer = (PSYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX)Memory::Alloc(BufferSize);
- DWORD ProcessorPkgCount = 0;
- DWORD ProcessorCoreCount = 0;
- DWORD ByteOffset = 0;
- while (ByteOffset + sizeof(SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX) <= BufferSize)
- {
- const SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX& Slpi = Buffer[ByteOffset / sizeof(SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX)];
- if (Slpi.Relationship == RelationProcessorCore)
+ Result = GetLogicalProcessorInformationEx(RelationAll, Buffer, &BufferSize);
+ if (!Result)
{
- ProcessorCoreCount++;
+ Memory::Free(Buffer);
+ throw std::runtime_error("Failed to get logical processor information");
}
- else if (Slpi.Relationship == RelationProcessorPackage)
+
+ DWORD ProcessorPkgCount = 0;
+ DWORD ProcessorCoreCount = 0;
+ DWORD LogicalProcessorCount = 0;
+
+ BYTE* Ptr = reinterpret_cast<BYTE*>(Buffer);
+ BYTE* const End = Ptr + BufferSize;
+ while (Ptr < End)
{
- ProcessorPkgCount++;
+ const SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX& Slpi = *reinterpret_cast<const SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX*>(Ptr);
+ if (Slpi.Relationship == RelationProcessorCore)
+ {
+ ++ProcessorCoreCount;
+
+ // Count logical processors (threads) across all processor groups for this core.
+ // Each core entry lists one GROUP_AFFINITY per group it spans; each set bit
+ // in the Mask represents one logical processor (HyperThreading sibling).
+ for (WORD g = 0; g < Slpi.Processor.GroupCount; ++g)
+ {
+ LogicalProcessorCount += static_cast<DWORD>(__popcnt64(Slpi.Processor.GroupMask[g].Mask));
+ }
+ }
+ else if (Slpi.Relationship == RelationProcessorPackage)
+ {
+ ++ProcessorPkgCount;
+ }
+ Ptr += Slpi.Size;
}
- ByteOffset += sizeof(SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX);
- }
- Metrics.CoreCount = ProcessorCoreCount;
- Metrics.CpuCount = ProcessorPkgCount;
+ Metrics.CoreCount = ProcessorCoreCount;
+ Metrics.CpuCount = ProcessorPkgCount;
+ Metrics.LogicalProcessorCount = LogicalProcessorCount;
- Memory::Free(Buffer);
+ Memory::Free(Buffer);
+ }
// Query memory status
- MEMORYSTATUSEX MemStatus{.dwLength = sizeof(MEMORYSTATUSEX)};
- GlobalMemoryStatusEx(&MemStatus);
+ {
+ MEMORYSTATUSEX MemStatus{.dwLength = sizeof(MEMORYSTATUSEX)};
+ GlobalMemoryStatusEx(&MemStatus);
+
+ Metrics.SystemMemoryMiB = MemStatus.ullTotalPhys / 1024 / 1024;
+ Metrics.AvailSystemMemoryMiB = MemStatus.ullAvailPhys / 1024 / 1024;
+ Metrics.VirtualMemoryMiB = MemStatus.ullTotalVirtual / 1024 / 1024;
+ Metrics.AvailVirtualMemoryMiB = MemStatus.ullAvailVirtual / 1024 / 1024;
+ Metrics.PageFileMiB = MemStatus.ullTotalPageFile / 1024 / 1024;
+ Metrics.AvailPageFileMiB = MemStatus.ullAvailPageFile / 1024 / 1024;
+ }
+
+ // Query CPU usage using PDH
+ //
+ // TODO: This should be changed to not require a Sleep, perhaps by using some
+ // background metrics gathering mechanism.
+
+ {
+ PDH_HQUERY QueryHandle = nullptr;
+ PDH_HCOUNTER CounterHandle = nullptr;
- Metrics.SystemMemoryMiB = MemStatus.ullTotalPhys / 1024 / 1024;
- Metrics.AvailSystemMemoryMiB = MemStatus.ullAvailPhys / 1024 / 1024;
- Metrics.VirtualMemoryMiB = MemStatus.ullTotalVirtual / 1024 / 1024;
- Metrics.AvailVirtualMemoryMiB = MemStatus.ullAvailVirtual / 1024 / 1024;
- Metrics.PageFileMiB = MemStatus.ullTotalPageFile / 1024 / 1024;
- Metrics.AvailPageFileMiB = MemStatus.ullAvailPageFile / 1024 / 1024;
+ if (PdhOpenQueryW(nullptr, 0, &QueryHandle) == ERROR_SUCCESS)
+ {
+ if (PdhAddEnglishCounterW(QueryHandle, L"\\Processor(_Total)\\% Processor Time", 0, &CounterHandle) == ERROR_SUCCESS)
+ {
+ PdhCollectQueryData(QueryHandle);
+ Sleep(100);
+ PdhCollectQueryData(QueryHandle);
+
+ PDH_FMT_COUNTERVALUE CounterValue;
+ if (PdhGetFormattedCounterValue(CounterHandle, PDH_FMT_DOUBLE, nullptr, &CounterValue) == ERROR_SUCCESS)
+ {
+ Metrics.CpuUsagePercent = static_cast<float>(CounterValue.doubleValue);
+ }
+ }
+ PdhCloseQuery(QueryHandle);
+ }
+ }
return Metrics;
}
@@ -190,6 +235,39 @@ GetSystemMetrics()
}
}
+ // Query CPU usage
+ Metrics.CpuUsagePercent = 0.0f;
+ if (FILE* Stat = fopen("/proc/stat", "r"))
+ {
+ char Line[256];
+ unsigned long User, Nice, System, Idle, IoWait, Irq, SoftIrq;
+ static unsigned long PrevUser = 0, PrevNice = 0, PrevSystem = 0, PrevIdle = 0, PrevIoWait = 0, PrevIrq = 0, PrevSoftIrq = 0;
+
+ if (fgets(Line, sizeof(Line), Stat))
+ {
+ if (sscanf(Line, "cpu %lu %lu %lu %lu %lu %lu %lu", &User, &Nice, &System, &Idle, &IoWait, &Irq, &SoftIrq) == 7)
+ {
+ unsigned long TotalDelta = (User + Nice + System + Idle + IoWait + Irq + SoftIrq) -
+ (PrevUser + PrevNice + PrevSystem + PrevIdle + PrevIoWait + PrevIrq + PrevSoftIrq);
+ unsigned long IdleDelta = Idle - PrevIdle;
+
+ if (TotalDelta > 0)
+ {
+ Metrics.CpuUsagePercent = 100.0f * (TotalDelta - IdleDelta) / TotalDelta;
+ }
+
+ PrevUser = User;
+ PrevNice = Nice;
+ PrevSystem = System;
+ PrevIdle = Idle;
+ PrevIoWait = IoWait;
+ PrevIrq = Irq;
+ PrevSoftIrq = SoftIrq;
+ }
+ }
+ fclose(Stat);
+ }
+
// Get memory information
long Pages = sysconf(_SC_PHYS_PAGES);
long PageSize = sysconf(_SC_PAGE_SIZE);
@@ -270,6 +348,25 @@ GetSystemMetrics()
sysctlbyname("hw.packages", &Packages, &Size, nullptr, 0);
Metrics.CpuCount = Packages > 0 ? Packages : 1;
+ // Query CPU usage using host_statistics64
+ Metrics.CpuUsagePercent = 0.0f;
+ host_cpu_load_info_data_t CpuLoad;
+ mach_msg_type_number_t CpuCount = sizeof(CpuLoad) / sizeof(natural_t);
+ if (host_statistics(mach_host_self(), HOST_CPU_LOAD_INFO, (host_info_t)&CpuLoad, &CpuCount) == KERN_SUCCESS)
+ {
+ unsigned long TotalTicks = 0;
+ for (int i = 0; i < CPU_STATE_MAX; ++i)
+ {
+ TotalTicks += CpuLoad.cpu_ticks[i];
+ }
+
+ if (TotalTicks > 0)
+ {
+ unsigned long IdleTicks = CpuLoad.cpu_ticks[CPU_STATE_IDLE];
+ Metrics.CpuUsagePercent = 100.0f * (TotalTicks - IdleTicks) / TotalTicks;
+ }
+ }
+
// Get memory information
uint64_t MemSize = 0;
Size = sizeof(MemSize);
diff --git a/src/zenhttp/servers/httpsys.cpp b/src/zenhttp/servers/httpsys.cpp
index 14896c803..c640ba90b 100644
--- a/src/zenhttp/servers/httpsys.cpp
+++ b/src/zenhttp/servers/httpsys.cpp
@@ -331,6 +331,8 @@ public:
virtual void WriteResponseAsync(std::function<void(HttpServerRequest&)>&& ContinuationHandler) override;
virtual bool TryGetRanges(HttpRanges& Ranges) override;
+ void LogRequest(HttpMessageResponseRequest* Response);
+
using HttpServerRequest::WriteResponse;
HttpSysServerRequest(const HttpSysServerRequest&) = delete;
@@ -429,7 +431,8 @@ public:
virtual HttpSysRequestHandler* HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesTransferred) override;
void SuppressResponseBody(); // typically used for HEAD requests
- inline int64_t GetResponseBodySize() const { return m_TotalDataSize; }
+ inline uint16_t GetResponseCode() const { return m_ResponseCode; }
+ inline int64_t GetResponseBodySize() const { return m_TotalDataSize; }
private:
eastl::fixed_vector<HTTP_DATA_CHUNK, 16> m_HttpDataChunks;
@@ -1886,7 +1889,7 @@ HttpSysServerRequest::WriteResponse(HttpResponseCode ResponseCode)
ZEN_ASSERT(IsHandled() == false);
- auto Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode);
+ HttpMessageResponseRequest* Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode);
if (SuppressBody())
{
@@ -1904,6 +1907,7 @@ HttpSysServerRequest::WriteResponse(HttpResponseCode ResponseCode)
# endif
SetIsHandled();
+ LogRequest(Response);
}
void
@@ -1913,7 +1917,7 @@ HttpSysServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentTy
ZEN_ASSERT(IsHandled() == false);
- auto Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode, ContentType, Blobs);
+ HttpMessageResponseRequest* Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode, ContentType, Blobs);
if (SuppressBody())
{
@@ -1931,6 +1935,20 @@ HttpSysServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentTy
# endif
SetIsHandled();
+ LogRequest(Response);
+}
+
+void
+HttpSysServerRequest::LogRequest(HttpMessageResponseRequest* Response)
+{
+ if (ShouldLogRequest())
+ {
+ ZEN_INFO("{} {} {} -> {}",
+ ToString(RequestVerb()),
+ m_UriUtf8.c_str(),
+ Response->GetResponseCode(),
+ NiceBytes(Response->GetResponseBodySize()));
+ }
}
void
@@ -1959,6 +1977,7 @@ HttpSysServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentTy
# endif
SetIsHandled();
+ LogRequest(Response);
}
void
diff --git a/src/zennet/beacon.cpp b/src/zennet/beacon.cpp
new file mode 100644
index 000000000..394a4afbb
--- /dev/null
+++ b/src/zennet/beacon.cpp
@@ -0,0 +1,170 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zennet/beacon.h>
+
+#include <zencore/basicfile.h>
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinaryfile.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/session.h>
+#include <zencore/uid.h>
+
+#include <fmt/format.h>
+#include <asio.hpp>
+#include <map>
+
+namespace zen {
+
+//////////////////////////////////////////////////////////////////////////
+
+struct FsBeacon::Impl
+{
+ Impl(std::filesystem::path ShareRoot);
+ ~Impl();
+
+ void EnsureValid();
+
+ void AddGroup(std::string_view GroupId, CbObject Metadata);
+ void ScanGroup(std::string_view GroupId, std::vector<Oid>& OutSessions);
+ void ReadMetadata(std::string_view GroupId, const std::vector<Oid>& InSessions, std::vector<CbObject>& OutMetadata);
+
+private:
+ std::filesystem::path m_ShareRoot;
+ zen::Oid m_SessionId;
+
+ struct GroupData
+ {
+ CbObject Metadata;
+ BasicFile LockFile;
+ };
+
+ std::map<std::string, GroupData> m_Registration;
+
+ std::filesystem::path GetSessionMarkerPath(std::string_view GroupId, const Oid& SessionId)
+ {
+ Oid::String_t SessionIdString;
+ SessionId.ToString(SessionIdString);
+
+ return m_ShareRoot / GroupId / SessionIdString;
+ }
+};
+
+FsBeacon::Impl::Impl(std::filesystem::path ShareRoot) : m_ShareRoot(ShareRoot), m_SessionId(GetSessionId())
+{
+}
+
+FsBeacon::Impl::~Impl()
+{
+}
+
+void
+FsBeacon::Impl::EnsureValid()
+{
+}
+
+void
+FsBeacon::Impl::AddGroup(std::string_view GroupId, CbObject Metadata)
+{
+ zen::CreateDirectories(m_ShareRoot / GroupId);
+ std::filesystem::path MarkerFile = GetSessionMarkerPath(GroupId, m_SessionId);
+
+ GroupData& Group = m_Registration[std::string(GroupId)];
+
+ Group.Metadata = Metadata;
+
+ std::error_code Ec;
+ Group.LockFile.Open(MarkerFile,
+ BasicFile::Mode::kTruncate | BasicFile::Mode::kPreventDelete |
+ BasicFile::Mode::kPreventWrite /* | BasicFile::Mode::kDeleteOnClose */,
+ Ec);
+
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("failed to open beacon marker file '{}' for write", MarkerFile));
+ }
+
+ Group.LockFile.WriteAll(Metadata.GetBuffer().AsIoBuffer(), Ec);
+
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("failed to write to beacon marker file '{}'", MarkerFile));
+ }
+
+ Group.LockFile.Flush();
+}
+
+void
+FsBeacon::Impl::ScanGroup(std::string_view GroupId, std::vector<Oid>& OutSessions)
+{
+ DirectoryContent Dc;
+ zen::GetDirectoryContent(m_ShareRoot / GroupId, zen::DirectoryContentFlags::IncludeFiles, /* out */ Dc);
+
+ for (const std::filesystem::path& FilePath : Dc.Files)
+ {
+ std::filesystem::path File = FilePath.filename();
+
+ std::error_code Ec;
+ if (std::filesystem::remove(FilePath, Ec) == false)
+ {
+ auto FileString = File.generic_string();
+
+ if (FileString.length() != Oid::StringLength)
+ continue;
+
+ if (const Oid SessionId = Oid::FromHexString(FileString))
+ {
+ if (std::filesystem::file_size(File, Ec) > 0)
+ {
+ OutSessions.push_back(SessionId);
+ }
+ }
+ }
+ }
+}
+
+void
+FsBeacon::Impl::ReadMetadata(std::string_view GroupId, const std::vector<Oid>& InSessions, std::vector<CbObject>& OutMetadata)
+{
+ for (const Oid& SessionId : InSessions)
+ {
+ const std::filesystem::path MarkerFile = GetSessionMarkerPath(GroupId, SessionId);
+
+ if (CbObject Metadata = LoadCompactBinaryObject(MarkerFile).Object)
+ {
+ OutMetadata.push_back(std::move(Metadata));
+ }
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+FsBeacon::FsBeacon(std::filesystem::path ShareRoot) : m_Impl(std::make_unique<Impl>(ShareRoot))
+{
+}
+
+FsBeacon::~FsBeacon()
+{
+}
+
+void
+FsBeacon::AddGroup(std::string_view GroupId, CbObject Metadata)
+{
+ m_Impl->AddGroup(GroupId, Metadata);
+}
+
+void
+FsBeacon::ScanGroup(std::string_view GroupId, std::vector<Oid>& OutSessions)
+{
+ m_Impl->ScanGroup(GroupId, OutSessions);
+}
+
+void
+FsBeacon::ReadMetadata(std::string_view GroupId, const std::vector<Oid>& InSessions, std::vector<CbObject>& OutMetadata)
+{
+ m_Impl->ReadMetadata(GroupId, InSessions, OutMetadata);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+} // namespace zen
diff --git a/src/zennet/include/zennet/beacon.h b/src/zennet/include/zennet/beacon.h
new file mode 100644
index 000000000..a8d4805cb
--- /dev/null
+++ b/src/zennet/include/zennet/beacon.h
@@ -0,0 +1,38 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zennet/zennet.h>
+
+#include <zencore/uid.h>
+
+#include <filesystem>
+#include <memory>
+#include <string>
+#include <vector>
+
+namespace zen {
+
+class CbObject;
+
+/** File-system based peer discovery
+
+ Intended to be used with an SMB file share as the root.
+ */
+
+class FsBeacon
+{
+public:
+ FsBeacon(std::filesystem::path ShareRoot);
+ ~FsBeacon();
+
+ void AddGroup(std::string_view GroupId, CbObject Metadata);
+ void ScanGroup(std::string_view GroupId, std::vector<Oid>& OutSessions);
+ void ReadMetadata(std::string_view GroupId, const std::vector<Oid>& InSessions, std::vector<CbObject>& OutMetadata);
+
+private:
+ struct Impl;
+ std::unique_ptr<Impl> m_Impl;
+};
+
+} // namespace zen
diff --git a/src/zennet/include/zennet/statsdclient.h b/src/zennet/include/zennet/statsdclient.h
index c378e49ce..7688c132c 100644
--- a/src/zennet/include/zennet/statsdclient.h
+++ b/src/zennet/include/zennet/statsdclient.h
@@ -8,6 +8,8 @@
#include <memory>
#include <string_view>
+#undef SendMessage
+
namespace zen {
class StatsTransportBase
diff --git a/src/zennet/statsdclient.cpp b/src/zennet/statsdclient.cpp
index fe5ca4dda..a0e8cb6ce 100644
--- a/src/zennet/statsdclient.cpp
+++ b/src/zennet/statsdclient.cpp
@@ -12,6 +12,7 @@
ZEN_THIRD_PARTY_INCLUDES_START
#include <zencore/windows.h>
#include <asio.hpp>
+#undef SendMessage
ZEN_THIRD_PARTY_INCLUDES_END
namespace zen {
diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp
index 2319ad66d..ade431393 100644
--- a/src/zenremotestore/builds/buildstorageoperations.cpp
+++ b/src/zenremotestore/builds/buildstorageoperations.cpp
@@ -4083,7 +4083,8 @@ BuildsOperationUpdateFolder::WriteSequenceChunkToCache(BufferedWriteFileCache::L
}
bool
-BuildsOperationUpdateFolder::GetBlockWriteOps(std::span<const IoHash> ChunkRawHashes,
+BuildsOperationUpdateFolder::GetBlockWriteOps(const IoHash& BlockRawHash,
+ std::span<const IoHash> ChunkRawHashes,
std::span<const uint32_t> ChunkCompressedLengths,
std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
@@ -4115,9 +4116,34 @@ BuildsOperationUpdateFolder::GetBlockWriteOps(std::span<const IoHash> ChunkR
uint64_t VerifyChunkSize;
CompressedBuffer CompressedChunk =
CompressedBuffer::FromCompressed(SharedBuffer::MakeView(ChunkMemoryView), VerifyChunkHash, VerifyChunkSize);
- ZEN_ASSERT(CompressedChunk);
- ZEN_ASSERT(VerifyChunkHash == ChunkHash);
- ZEN_ASSERT(VerifyChunkSize == m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]);
+ if (!CompressedChunk)
+ {
+ throw std::runtime_error(fmt::format("Chunk {} at {}, size {} in block {} is not a valid compressed buffer",
+ ChunkHash,
+ OffsetInBlock,
+ ChunkCompressedSize,
+ BlockRawHash));
+ }
+ if (VerifyChunkHash != ChunkHash)
+ {
+ throw std::runtime_error(fmt::format("Chunk {} at {}, size {} in block {} has a mismatching content hash {}",
+ ChunkHash,
+ OffsetInBlock,
+ ChunkCompressedSize,
+ BlockRawHash,
+ VerifyChunkHash));
+ }
+ if (VerifyChunkSize != m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex])
+ {
+ throw std::runtime_error(
+ fmt::format("Chunk {} at {}, size {} in block {} has a mismatching raw size {}, expected {}",
+ ChunkHash,
+ OffsetInBlock,
+ ChunkCompressedSize,
+ BlockRawHash,
+ VerifyChunkSize,
+ m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]));
+ }
OodleCompressor ChunkCompressor;
OodleCompressionLevel ChunkCompressionLevel;
@@ -4138,7 +4164,18 @@ BuildsOperationUpdateFolder::GetBlockWriteOps(std::span<const IoHash> ChunkR
{
Decompressed = CompressedChunk.Decompress().AsIoBuffer();
}
- ZEN_ASSERT(Decompressed.GetSize() == m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]);
+
+ if (Decompressed.GetSize() != m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex])
+ {
+ throw std::runtime_error(fmt::format("Chunk {} at {}, size {} in block {} decompressed to size {}, expected {}",
+ ChunkHash,
+ OffsetInBlock,
+ ChunkCompressedSize,
+ BlockRawHash,
+ Decompressed.GetSize(),
+ m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]));
+ }
+
ZEN_ASSERT_SLOW(ChunkHash == IoHash::HashBuffer(Decompressed));
for (const ChunkedContentLookup::ChunkSequenceLocation* Target : ChunkTargetPtrs)
{
@@ -4237,7 +4274,8 @@ BuildsOperationUpdateFolder::WriteChunksBlockToCache(const ChunkBlockDescription
const std::vector<uint32_t> ChunkCompressedLengths =
ReadChunkBlockHeader(BlockView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder()), HeaderSize);
- if (GetBlockWriteOps(BlockDescription.ChunkRawHashes,
+ if (GetBlockWriteOps(BlockDescription.BlockHash,
+ BlockDescription.ChunkRawHashes,
ChunkCompressedLengths,
SequenceIndexChunksLeftToWriteCounters,
RemoteChunkIndexNeedsCopyFromSourceFlags,
@@ -4252,7 +4290,8 @@ BuildsOperationUpdateFolder::WriteChunksBlockToCache(const ChunkBlockDescription
return false;
}
- if (GetBlockWriteOps(BlockDescription.ChunkRawHashes,
+ if (GetBlockWriteOps(BlockDescription.BlockHash,
+ BlockDescription.ChunkRawHashes,
BlockDescription.ChunkCompressedLengths,
SequenceIndexChunksLeftToWriteCounters,
RemoteChunkIndexNeedsCopyFromSourceFlags,
@@ -4283,7 +4322,8 @@ BuildsOperationUpdateFolder::WritePartialBlockChunksToCache(const ChunkBlockDesc
const MemoryView BlockView = BlockMemoryBuffer.GetView();
BlockWriteOps Ops;
- if (GetBlockWriteOps(BlockDescription.ChunkRawHashes,
+ if (GetBlockWriteOps(BlockDescription.BlockHash,
+ BlockDescription.ChunkRawHashes,
BlockDescription.ChunkCompressedLengths,
SequenceIndexChunksLeftToWriteCounters,
RemoteChunkIndexNeedsCopyFromSourceFlags,
@@ -5334,6 +5374,13 @@ BuildsOperationUploadFolder::FetchChunk(const ChunkedFolderContent& Content,
ZEN_ASSERT(!ChunkLocations.empty());
CompositeBuffer Chunk =
OpenFileCache.GetRange(ChunkLocations[0].SequenceIndex, ChunkLocations[0].Offset, Content.ChunkedContent.ChunkRawSizes[ChunkIndex]);
+ if (!Chunk)
+ {
+ throw std::runtime_error(fmt::format("Unable to read chunk at {}, size {} from '{}'",
+ ChunkLocations[0].Offset,
+ Content.ChunkedContent.ChunkRawSizes[ChunkIndex],
+ Content.Paths[Lookup.SequenceIndexFirstPathIndex[ChunkLocations[0].SequenceIndex]]));
+ }
ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk) == ChunkHash);
return Chunk;
};
@@ -5362,10 +5409,7 @@ BuildsOperationUploadFolder::GenerateBlock(const ChunkedFolderContent& Content,
Content.ChunkedContent.ChunkHashes[ChunkIndex],
[this, &Content, &Lookup, &OpenFileCache, ChunkIndex](const IoHash& ChunkHash) -> std::pair<uint64_t, CompressedBuffer> {
CompositeBuffer Chunk = FetchChunk(Content, Lookup, ChunkHash, OpenFileCache);
- if (!Chunk)
- {
- ZEN_ASSERT(false);
- }
+ ZEN_ASSERT(Chunk);
uint64_t RawSize = Chunk.GetSize();
const bool ShouldCompressChunk = RawSize >= m_Options.MinimumSizeForCompressInBlock &&
diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h
index 6304159ae..9e5bf8d91 100644
--- a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h
+++ b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h
@@ -339,7 +339,8 @@ private:
const uint64_t FileOffset,
const uint32_t PathIndex);
- bool GetBlockWriteOps(std::span<const IoHash> ChunkRawHashes,
+ bool GetBlockWriteOps(const IoHash& BlockRawHash,
+ std::span<const IoHash> ChunkRawHashes,
std::span<const uint32_t> ChunkCompressedLengths,
std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
diff --git a/src/zenserver-test/function-tests.cpp b/src/zenserver-test/function-tests.cpp
new file mode 100644
index 000000000..559387fa2
--- /dev/null
+++ b/src/zenserver-test/function-tests.cpp
@@ -0,0 +1,34 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zencore/zencore.h>
+
+#if ZEN_WITH_TESTS
+
+# include <zencore/compactbinary.h>
+# include <zencore/compactbinarybuilder.h>
+# include <zencore/string.h>
+# include <zencore/testing.h>
+# include <zenutil/zenserverprocess.h>
+
+# include "zenserver-test.h"
+
+namespace zen::tests {
+
+using namespace std::literals;
+
+TEST_CASE("function.run")
+{
+ std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
+
+ ZenServerInstance Instance(TestEnv);
+ Instance.SetDataDir(TestDir);
+ Instance.SpawnServer(13337);
+
+ ZEN_INFO("Waiting...");
+
+ Instance.WaitUntilReady();
+}
+
+} // namespace zen::tests
+
+#endif
diff --git a/src/zenserver/compute/computeserver.cpp b/src/zenserver/compute/computeserver.cpp
new file mode 100644
index 000000000..173f56386
--- /dev/null
+++ b/src/zenserver/compute/computeserver.cpp
@@ -0,0 +1,330 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "computeserver.h"
+#include <zencompute/httpfunctionservice.h>
+#include "computeservice.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include <zencore/fmtutils.h>
+# include <zencore/memory/llm.h>
+# include <zencore/memory/memorytrace.h>
+# include <zencore/memory/tagtrace.h>
+# include <zencore/scopeguard.h>
+# include <zencore/sentryintegration.h>
+# include <zencore/system.h>
+# include <zencore/windows.h>
+# include <zenhttp/httpapiservice.h>
+# include <zenstore/cidstore.h>
+# include <zenutil/service.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+# include <cxxopts.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+void
+ZenComputeServerConfigurator::AddCliOptions(cxxopts::Options& Options)
+{
+ Options.add_option("compute",
+ "",
+ "upstream-notification-endpoint",
+ "Endpoint URL for upstream notifications",
+ cxxopts::value<std::string>(m_ServerOptions.UpstreamNotificationEndpoint)->default_value(""),
+ "");
+
+ Options.add_option("compute",
+ "",
+ "instance-id",
+ "Instance ID for use in notifications",
+ cxxopts::value<std::string>(m_ServerOptions.InstanceId)->default_value(""),
+ "");
+}
+
+void
+ZenComputeServerConfigurator::AddConfigOptions(LuaConfig::Options& Options)
+{
+ ZEN_UNUSED(Options);
+}
+
+void
+ZenComputeServerConfigurator::ApplyOptions(cxxopts::Options& Options)
+{
+ ZEN_UNUSED(Options);
+}
+
+void
+ZenComputeServerConfigurator::OnConfigFileParsed(LuaConfig::Options& LuaOptions)
+{
+ ZEN_UNUSED(LuaOptions);
+}
+
+void
+ZenComputeServerConfigurator::ValidateOptions()
+{
+}
+
+///////////////////////////////////////////////////////////////////////////
+
+ZenComputeServer::ZenComputeServer()
+{
+}
+
+ZenComputeServer::~ZenComputeServer()
+{
+ Cleanup();
+}
+
+int
+ZenComputeServer::Initialize(const ZenComputeServerConfig& ServerConfig, ZenServerState::ZenServerEntry* ServerEntry)
+{
+ ZEN_TRACE_CPU("ZenComputeServer::Initialize");
+ ZEN_MEMSCOPE(GetZenserverTag());
+
+ ZEN_INFO(ZEN_APP_NAME " initializing in HUB server mode");
+
+ const int EffectiveBasePort = ZenServerBase::Initialize(ServerConfig, ServerEntry);
+ if (EffectiveBasePort < 0)
+ {
+ return EffectiveBasePort;
+ }
+
+ // This is a workaround to make sure we can have automated tests. Without
+ // this the ranges for different child zen hub processes could overlap with
+ // the main test range.
+ ZenServerEnvironment::SetBaseChildId(1000);
+
+ m_DebugOptionForcedCrash = ServerConfig.ShouldCrash;
+
+ InitializeState(ServerConfig);
+ InitializeServices(ServerConfig);
+ RegisterServices(ServerConfig);
+
+ ZenServerBase::Finalize();
+
+ return EffectiveBasePort;
+}
+
+void
+ZenComputeServer::Cleanup()
+{
+ ZEN_TRACE_CPU("ZenStorageServer::Cleanup");
+ ZEN_INFO(ZEN_APP_NAME " cleaning up");
+ try
+ {
+ m_IoContext.stop();
+ if (m_IoRunner.joinable())
+ {
+ m_IoRunner.join();
+ }
+
+ if (m_Http)
+ {
+ m_Http->Close();
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("exception thrown during Cleanup() in {}: '{}'", ZEN_APP_NAME, Ex.what());
+ }
+}
+
+void
+ZenComputeServer::InitializeState(const ZenComputeServerConfig& ServerConfig)
+{
+ ZEN_UNUSED(ServerConfig);
+}
+
+void
+ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig)
+{
+ ZEN_INFO("initializing storage");
+
+ CidStoreConfiguration Config;
+ Config.RootDirectory = m_DataRoot / "cas";
+
+ m_CidStore = std::make_unique<CidStore>(m_GcManager);
+ m_CidStore->Initialize(Config);
+
+ ZEN_INFO("instantiating API service");
+ m_ApiService = std::make_unique<zen::HttpApiService>(*m_Http);
+
+ ZEN_INFO("instantiating compute service");
+ m_ComputeService = std::make_unique<HttpComputeService>(ServerConfig.DataDir / "compute");
+
+ // Ref<zen::compute::FunctionRunner> Runner;
+ // Runner = zen::compute::CreateLocalRunner(*m_CidStore, ServerConfig.DataDir / "runner");
+
+ // TODO: (re)implement default configuration here
+
+ ZEN_INFO("instantiating function service");
+ m_FunctionService =
+ std::make_unique<zen::compute::HttpFunctionService>(*m_CidStore, m_StatsService, ServerConfig.DataDir / "functions");
+}
+
+void
+ZenComputeServer::RegisterServices(const ZenComputeServerConfig& ServerConfig)
+{
+ ZEN_UNUSED(ServerConfig);
+
+ if (m_ComputeService)
+ {
+ m_Http->RegisterService(*m_ComputeService);
+ }
+
+ if (m_ApiService)
+ {
+ m_Http->RegisterService(*m_ApiService);
+ }
+
+ if (m_FunctionService)
+ {
+ m_Http->RegisterService(*m_FunctionService);
+ }
+}
+
+void
+ZenComputeServer::Run()
+{
+ if (m_ProcessMonitor.IsActive())
+ {
+ CheckOwnerPid();
+ }
+
+ if (!m_TestMode)
+ {
+ // clang-format off
+ ZEN_INFO( R"(__________ _________ __ )" "\n"
+ R"(\____ /____ ____ \_ ___ \ ____ _____ ______ __ ___/ |_ ____ )" "\n"
+ R"( / // __ \ / \/ \ \/ / _ \ / \\____ \| | \ __\/ __ \ )" "\n"
+ R"( / /\ ___/| | \ \___( <_> ) Y Y \ |_> > | /| | \ ___/ )" "\n"
+ R"(/_______ \___ >___| /\______ /\____/|__|_| / __/|____/ |__| \___ >)" "\n"
+ R"( \/ \/ \/ \/ \/|__| \/ )");
+ // clang-format on
+
+ ExtendableStringBuilder<256> BuildOptions;
+ GetBuildOptions(BuildOptions, '\n');
+ ZEN_INFO("Build options ({}/{}):\n{}", GetOperatingSystemName(), GetCpuName(), BuildOptions);
+ }
+
+ ZEN_INFO(ZEN_APP_NAME " now running as COMPUTE (pid: {})", GetCurrentProcessId());
+
+# if ZEN_PLATFORM_WINDOWS
+ if (zen::windows::IsRunningOnWine())
+ {
+ ZEN_INFO("detected Wine session - " ZEN_APP_NAME " is not formally tested on Wine and may therefore not work or perform well");
+ }
+# endif
+
+# if ZEN_USE_SENTRY
+ ZEN_INFO("sentry crash handler {}", m_UseSentry ? "ENABLED" : "DISABLED");
+ if (m_UseSentry)
+ {
+ SentryIntegration::ClearCaches();
+ }
+# endif
+
+ if (m_DebugOptionForcedCrash)
+ {
+ ZEN_DEBUG_BREAK();
+ }
+
+ const bool IsInteractiveMode = IsInteractiveSession(); // &&!m_TestMode;
+
+ SetNewState(kRunning);
+
+ OnReady();
+
+ m_Http->Run(IsInteractiveMode);
+
+ SetNewState(kShuttingDown);
+
+ ZEN_INFO(ZEN_APP_NAME " exiting");
+}
+
+//////////////////////////////////////////////////////////////////////////////////
+
+ZenComputeServerMain::ZenComputeServerMain(ZenComputeServerConfig& ServerOptions)
+: ZenServerMain(ServerOptions)
+, m_ServerOptions(ServerOptions)
+{
+}
+
+void
+ZenComputeServerMain::DoRun(ZenServerState::ZenServerEntry* Entry)
+{
+ ZenComputeServer Server;
+ Server.SetDataRoot(m_ServerOptions.DataDir);
+ Server.SetContentRoot(m_ServerOptions.ContentDir);
+ Server.SetTestMode(m_ServerOptions.IsTest);
+ Server.SetDedicatedMode(m_ServerOptions.IsDedicated);
+
+ const int EffectiveBasePort = Server.Initialize(m_ServerOptions, Entry);
+ if (EffectiveBasePort == -1)
+ {
+ // Server.Initialize has already logged what the issue is - just exit with failure code here.
+ std::exit(1);
+ }
+
+ Entry->EffectiveListenPort = uint16_t(EffectiveBasePort);
+ if (EffectiveBasePort != m_ServerOptions.BasePort)
+ {
+ ZEN_INFO(ZEN_APP_NAME " - relocated to base port {}", EffectiveBasePort);
+ m_ServerOptions.BasePort = EffectiveBasePort;
+ }
+
+ std::unique_ptr<std::thread> ShutdownThread;
+ std::unique_ptr<NamedEvent> ShutdownEvent;
+
+ ExtendableStringBuilder<64> ShutdownEventName;
+ ShutdownEventName << "Zen_" << m_ServerOptions.BasePort << "_Shutdown";
+ ShutdownEvent.reset(new NamedEvent{ShutdownEventName});
+
+ // Monitor shutdown signals
+
+ ShutdownThread.reset(new std::thread{[&] {
+ SetCurrentThreadName("shutdown_mon");
+
+ ZEN_INFO("shutdown monitor thread waiting for shutdown signal '{}' for process {}", ShutdownEventName, zen::GetCurrentProcessId());
+
+ if (ShutdownEvent->Wait())
+ {
+ ZEN_INFO("shutdown signal for pid {} received", zen::GetCurrentProcessId());
+ Server.RequestExit(0);
+ }
+ else
+ {
+ ZEN_INFO("shutdown signal wait() failed");
+ }
+ }});
+
+ auto CleanupShutdown = MakeGuard([&ShutdownEvent, &ShutdownThread] {
+ ReportServiceStatus(ServiceStatus::Stopping);
+
+ if (ShutdownEvent)
+ {
+ ShutdownEvent->Set();
+ }
+ if (ShutdownThread && ShutdownThread->joinable())
+ {
+ ShutdownThread->join();
+ }
+ });
+
+ // If we have a parent process, establish the mechanisms we need
+ // to be able to communicate readiness with the parent
+
+ Server.SetIsReadyFunc([&] {
+ std::error_code Ec;
+ m_LockFile.Update(MakeLockData(true), Ec);
+ ReportServiceStatus(ServiceStatus::Running);
+ NotifyReady();
+ });
+
+ Server.Run();
+}
+
+} // namespace zen
+
+#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/src/zenserver/compute/computeserver.h b/src/zenserver/compute/computeserver.h
new file mode 100644
index 000000000..625140b23
--- /dev/null
+++ b/src/zenserver/compute/computeserver.h
@@ -0,0 +1,106 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "zenserver.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include <zenstore/gc.h>
+
+namespace cxxopts {
+class Options;
+}
+namespace zen::LuaConfig {
+struct Options;
+}
+
+namespace zen::compute {
+class HttpFunctionService;
+}
+
+namespace zen {
+
+class CidStore;
+class HttpApiService;
+class HttpComputeService;
+
+struct ZenComputeServerConfig : public ZenServerConfig
+{
+ std::string UpstreamNotificationEndpoint;
+ std::string InstanceId; // For use in notifications
+};
+
+struct ZenComputeServerConfigurator : public ZenServerConfiguratorBase
+{
+ ZenComputeServerConfigurator(ZenComputeServerConfig& ServerOptions)
+ : ZenServerConfiguratorBase(ServerOptions)
+ , m_ServerOptions(ServerOptions)
+ {
+ }
+
+ ~ZenComputeServerConfigurator() = default;
+
+private:
+ virtual void AddCliOptions(cxxopts::Options& Options) override;
+ virtual void AddConfigOptions(LuaConfig::Options& Options) override;
+ virtual void ApplyOptions(cxxopts::Options& Options) override;
+ virtual void OnConfigFileParsed(LuaConfig::Options& LuaOptions) override;
+ virtual void ValidateOptions() override;
+
+ ZenComputeServerConfig& m_ServerOptions;
+};
+
+class ZenComputeServerMain : public ZenServerMain
+{
+public:
+ ZenComputeServerMain(ZenComputeServerConfig& ServerOptions);
+ virtual void DoRun(ZenServerState::ZenServerEntry* Entry) override;
+
+ ZenComputeServerMain(const ZenComputeServerMain&) = delete;
+ ZenComputeServerMain& operator=(const ZenComputeServerMain&) = delete;
+
+ typedef ZenComputeServerConfig Config;
+ typedef ZenComputeServerConfigurator Configurator;
+
+private:
+ ZenComputeServerConfig& m_ServerOptions;
+};
+
+/**
+ * The compute server handles DDC build function execution requests
+ * only. It's intended to be used on a pure compute resource and does
+ * not handle any storage tasks. The actual scheduling happens upstream
+ * in a storage server instance.
+ */
+
+class ZenComputeServer : public ZenServerBase
+{
+ ZenComputeServer& operator=(ZenComputeServer&&) = delete;
+ ZenComputeServer(ZenComputeServer&&) = delete;
+
+public:
+ ZenComputeServer();
+ ~ZenComputeServer();
+
+ int Initialize(const ZenComputeServerConfig& ServerConfig, ZenServerState::ZenServerEntry* ServerEntry);
+ void Run();
+ void Cleanup();
+
+private:
+ HttpStatsService m_StatsService;
+ GcManager m_GcManager;
+ GcScheduler m_GcScheduler{m_GcManager};
+ std::unique_ptr<CidStore> m_CidStore;
+ std::unique_ptr<HttpComputeService> m_ComputeService;
+ std::unique_ptr<HttpApiService> m_ApiService;
+ std::unique_ptr<zen::compute::HttpFunctionService> m_FunctionService;
+
+ void InitializeState(const ZenComputeServerConfig& ServerConfig);
+ void InitializeServices(const ZenComputeServerConfig& ServerConfig);
+ void RegisterServices(const ZenComputeServerConfig& ServerConfig);
+};
+
+} // namespace zen
+
+#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/src/zenserver/compute/computeservice.cpp b/src/zenserver/compute/computeservice.cpp
new file mode 100644
index 000000000..2c0bc0ae9
--- /dev/null
+++ b/src/zenserver/compute/computeservice.cpp
@@ -0,0 +1,100 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "computeservice.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include <zencore/compactbinarybuilder.h>
+# include <zencore/filesystem.h>
+# include <zencore/fmtutils.h>
+# include <zencore/logging.h>
+# include <zencore/system.h>
+# include <zenutil/zenserverprocess.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+# include <EASTL/fixed_vector.h>
+# include <asio.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+# include <unordered_map>
+
+namespace zen {
+
+//////////////////////////////////////////////////////////////////////////
+
+struct ResourceMetrics
+{
+ uint64_t DiskUsageBytes = 0;
+ uint64_t MemoryUsageBytes = 0;
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+struct HttpComputeService::Impl
+{
+ Impl(const Impl&) = delete;
+ Impl& operator=(const Impl&) = delete;
+
+ Impl();
+ ~Impl();
+
+ void Initialize(std::filesystem::path BaseDir) { ZEN_UNUSED(BaseDir); }
+
+ void Cleanup() {}
+
+private:
+};
+
+HttpComputeService::Impl::Impl()
+{
+}
+
+HttpComputeService::Impl::~Impl()
+{
+}
+
+///////////////////////////////////////////////////////////////////////////
+
+HttpComputeService::HttpComputeService(std::filesystem::path BaseDir) : m_Impl(std::make_unique<Impl>())
+{
+ using namespace std::literals;
+
+ m_Impl->Initialize(BaseDir);
+
+ m_Router.RegisterRoute(
+ "status",
+ [this](HttpRouterRequest& Req) {
+ CbObjectWriter Obj;
+ Obj.BeginArray("modules");
+ Obj.EndArray();
+ Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save());
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "stats",
+ [this](HttpRouterRequest& Req) {
+ CbObjectWriter Obj;
+ Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save());
+ },
+ HttpVerb::kGet);
+}
+
+HttpComputeService::~HttpComputeService()
+{
+}
+
+const char*
+HttpComputeService::BaseUri() const
+{
+ return "/compute/";
+}
+
+void
+HttpComputeService::HandleRequest(zen::HttpServerRequest& Request)
+{
+ m_Router.HandleRequest(Request);
+}
+
+} // namespace zen
+#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/src/zenserver/compute/computeservice.h b/src/zenserver/compute/computeservice.h
new file mode 100644
index 000000000..339200dd8
--- /dev/null
+++ b/src/zenserver/compute/computeservice.h
@@ -0,0 +1,36 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zenhttp/httpserver.h>
+
+#if ZEN_WITH_COMPUTE_SERVICES
+namespace zen {
+
+/** ZenServer Compute Service
+ *
+ * Manages a set of compute workers for use in UEFN content worker
+ *
+ */
+class HttpComputeService : public zen::HttpService
+{
+public:
+ HttpComputeService(std::filesystem::path BaseDir);
+ ~HttpComputeService();
+
+ HttpComputeService(const HttpComputeService&) = delete;
+ HttpComputeService& operator=(const HttpComputeService&) = delete;
+
+ virtual const char* BaseUri() const override;
+ virtual void HandleRequest(zen::HttpServerRequest& Request) override;
+
+private:
+ HttpRequestRouter m_Router;
+
+ struct Impl;
+
+ std::unique_ptr<Impl> m_Impl;
+};
+
+} // namespace zen
+#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/src/zenserver/frontend/html/compute.html b/src/zenserver/frontend/html/compute.html
new file mode 100644
index 000000000..668189fe5
--- /dev/null
+++ b/src/zenserver/frontend/html/compute.html
@@ -0,0 +1,991 @@
+<!DOCTYPE html>
+<html lang="en">
+<head>
+ <meta charset="UTF-8">
+ <meta name="viewport" content="width=device-width, initial-scale=1.0">
+ <title>Zen Compute Dashboard</title>
+ <script src="https://cdn.jsdelivr.net/npm/[email protected]/dist/chart.umd.min.js"></script>
+ <style>
+ * {
+ margin: 0;
+ padding: 0;
+ box-sizing: border-box;
+ }
+
+ body {
+ font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Oxygen, Ubuntu, Cantarell, sans-serif;
+ background: #0d1117;
+ color: #c9d1d9;
+ padding: 20px;
+ }
+
+ .container {
+ max-width: 1400px;
+ margin: 0 auto;
+ }
+
+ h1 {
+ font-size: 32px;
+ font-weight: 600;
+ margin-bottom: 10px;
+ color: #f0f6fc;
+ }
+
+ .header {
+ display: flex;
+ justify-content: space-between;
+ align-items: center;
+ margin-bottom: 30px;
+ }
+
+ .health-indicator {
+ display: flex;
+ align-items: center;
+ gap: 8px;
+ font-size: 14px;
+ padding: 8px 16px;
+ border-radius: 6px;
+ background: #161b22;
+ border: 1px solid #30363d;
+ }
+
+ .health-indicator .status-dot {
+ width: 10px;
+ height: 10px;
+ border-radius: 50%;
+ background: #6e7681;
+ }
+
+ .health-indicator.healthy .status-dot {
+ background: #3fb950;
+ }
+
+ .health-indicator.unhealthy .status-dot {
+ background: #f85149;
+ }
+
+ .grid {
+ display: grid;
+ grid-template-columns: repeat(auto-fit, minmax(280px, 1fr));
+ gap: 20px;
+ margin-bottom: 30px;
+ }
+
+ .card {
+ background: #161b22;
+ border: 1px solid #30363d;
+ border-radius: 6px;
+ padding: 20px;
+ }
+
+ .card-title {
+ font-size: 14px;
+ font-weight: 600;
+ color: #8b949e;
+ margin-bottom: 12px;
+ text-transform: uppercase;
+ letter-spacing: 0.5px;
+ }
+
+ .metric-value {
+ font-size: 36px;
+ font-weight: 600;
+ color: #f0f6fc;
+ line-height: 1;
+ }
+
+ .metric-label {
+ font-size: 12px;
+ color: #8b949e;
+ margin-top: 4px;
+ }
+
+ .chart-container {
+ position: relative;
+ height: 300px;
+ margin-top: 20px;
+ }
+
+ .stats-row {
+ display: flex;
+ justify-content: space-between;
+ margin-bottom: 12px;
+ padding: 8px 0;
+ border-bottom: 1px solid #21262d;
+ }
+
+ .stats-row:last-child {
+ border-bottom: none;
+ margin-bottom: 0;
+ }
+
+ .stats-label {
+ color: #8b949e;
+ font-size: 13px;
+ }
+
+ .stats-value {
+ color: #f0f6fc;
+ font-weight: 600;
+ font-size: 13px;
+ }
+
+ .rate-stats {
+ display: grid;
+ grid-template-columns: repeat(3, 1fr);
+ gap: 16px;
+ margin-top: 16px;
+ }
+
+ .rate-item {
+ text-align: center;
+ }
+
+ .rate-value {
+ font-size: 20px;
+ font-weight: 600;
+ color: #58a6ff;
+ }
+
+ .rate-label {
+ font-size: 11px;
+ color: #8b949e;
+ margin-top: 4px;
+ text-transform: uppercase;
+ }
+
+ .progress-bar {
+ width: 100%;
+ height: 8px;
+ background: #21262d;
+ border-radius: 4px;
+ overflow: hidden;
+ margin-top: 8px;
+ }
+
+ .progress-fill {
+ height: 100%;
+ background: #58a6ff;
+ transition: width 0.3s ease;
+ }
+
+ .timestamp {
+ font-size: 12px;
+ color: #6e7681;
+ text-align: right;
+ margin-top: 30px;
+ }
+
+ .error {
+ color: #f85149;
+ padding: 12px;
+ background: #1c1c1c;
+ border-radius: 6px;
+ margin: 20px 0;
+ font-size: 13px;
+ }
+
+ .section-title {
+ font-size: 20px;
+ font-weight: 600;
+ margin-bottom: 20px;
+ color: #f0f6fc;
+ }
+
+ .worker-row {
+ cursor: pointer;
+ transition: background 0.15s;
+ }
+
+ .worker-row:hover {
+ background: #1c2128;
+ }
+
+ .worker-row.selected {
+ background: #1f2d3d;
+ }
+
+ .worker-detail {
+ margin-top: 20px;
+ border-top: 1px solid #30363d;
+ padding-top: 16px;
+ }
+
+ .worker-detail-title {
+ font-size: 15px;
+ font-weight: 600;
+ color: #f0f6fc;
+ margin-bottom: 12px;
+ }
+
+ .detail-section {
+ margin-bottom: 16px;
+ }
+
+ .detail-section-label {
+ font-size: 11px;
+ font-weight: 600;
+ color: #8b949e;
+ text-transform: uppercase;
+ letter-spacing: 0.5px;
+ margin-bottom: 6px;
+ }
+
+ .detail-table {
+ width: 100%;
+ border-collapse: collapse;
+ font-size: 12px;
+ }
+
+ .detail-table td {
+ padding: 4px 8px;
+ color: #c9d1d9;
+ border-bottom: 1px solid #21262d;
+ vertical-align: top;
+ }
+
+ .detail-table td:first-child {
+ color: #8b949e;
+ width: 40%;
+ font-family: monospace;
+ }
+
+ .detail-table tr:last-child td {
+ border-bottom: none;
+ }
+
+ .detail-mono {
+ font-family: monospace;
+ font-size: 11px;
+ color: #8b949e;
+ }
+
+ .detail-tag {
+ display: inline-block;
+ padding: 2px 8px;
+ border-radius: 4px;
+ background: #21262d;
+ color: #c9d1d9;
+ font-size: 11px;
+ margin: 2px 4px 2px 0;
+ }
+
+ .status-badge {
+ display: inline-block;
+ padding: 2px 8px;
+ border-radius: 4px;
+ font-size: 11px;
+ font-weight: 600;
+ }
+
+ .status-badge.success {
+ background: rgba(63, 185, 80, 0.15);
+ color: #3fb950;
+ }
+
+ .status-badge.failure {
+ background: rgba(248, 81, 73, 0.15);
+ color: #f85149;
+ }
+ </style>
+</head>
+<body>
+ <div class="container">
+ <div class="header">
+ <div>
+ <h1>Zen Compute Dashboard</h1>
+ <div class="timestamp">Last updated: <span id="last-update">Never</span></div>
+ </div>
+ <div class="health-indicator" id="health-indicator">
+ <div class="status-dot"></div>
+ <span id="health-text">Checking...</span>
+ </div>
+ </div>
+
+ <div id="error-container"></div>
+
+ <!-- Action Queue Stats -->
+ <div class="section-title">Action Queue</div>
+ <div class="grid">
+ <div class="card">
+ <div class="card-title">Pending Actions</div>
+ <div class="metric-value" id="actions-pending">-</div>
+ <div class="metric-label">Waiting to be scheduled</div>
+ </div>
+ <div class="card">
+ <div class="card-title">Running Actions</div>
+ <div class="metric-value" id="actions-running">-</div>
+ <div class="metric-label">Currently executing</div>
+ </div>
+ <div class="card">
+ <div class="card-title">Completed Actions</div>
+ <div class="metric-value" id="actions-complete">-</div>
+ <div class="metric-label">Results available</div>
+ </div>
+ </div>
+
+ <!-- Action Queue Chart -->
+ <div class="card" style="margin-bottom: 30px;">
+ <div class="card-title">Action Queue History</div>
+ <div class="chart-container">
+ <canvas id="queue-chart"></canvas>
+ </div>
+ </div>
+
+ <!-- Performance Metrics -->
+ <div class="section-title">Performance Metrics</div>
+ <div class="card" style="margin-bottom: 30px;">
+ <div class="card-title">Completion Rate</div>
+ <div class="rate-stats">
+ <div class="rate-item">
+ <div class="rate-value" id="rate-1">-</div>
+ <div class="rate-label">1 min rate</div>
+ </div>
+ <div class="rate-item">
+ <div class="rate-value" id="rate-5">-</div>
+ <div class="rate-label">5 min rate</div>
+ </div>
+ <div class="rate-item">
+ <div class="rate-value" id="rate-15">-</div>
+ <div class="rate-label">15 min rate</div>
+ </div>
+ </div>
+ <div style="margin-top: 20px;">
+ <div class="stats-row">
+ <span class="stats-label">Total Retired</span>
+ <span class="stats-value" id="retired-count">-</span>
+ </div>
+ <div class="stats-row">
+ <span class="stats-label">Mean Rate</span>
+ <span class="stats-value" id="rate-mean">-</span>
+ </div>
+ </div>
+ </div>
+
+ <!-- Workers -->
+ <div class="section-title">Workers</div>
+ <div class="card" style="margin-bottom: 30px;">
+ <div class="card-title">Worker Status</div>
+ <div class="stats-row">
+ <span class="stats-label">Registered Workers</span>
+ <span class="stats-value" id="worker-count">-</span>
+ </div>
+ <div id="worker-table-container" style="margin-top: 16px; display: none;">
+ <table id="worker-table" style="width: 100%; border-collapse: collapse; font-size: 13px;">
+ <thead>
+ <tr>
+ <th style="text-align: left; color: #8b949e; padding: 6px 8px; border-bottom: 1px solid #30363d; font-weight: 600; text-transform: uppercase; letter-spacing: 0.5px; font-size: 11px;">Name</th>
+ <th style="text-align: left; color: #8b949e; padding: 6px 8px; border-bottom: 1px solid #30363d; font-weight: 600; text-transform: uppercase; letter-spacing: 0.5px; font-size: 11px;">Platform</th>
+ <th style="text-align: right; color: #8b949e; padding: 6px 8px; border-bottom: 1px solid #30363d; font-weight: 600; text-transform: uppercase; letter-spacing: 0.5px; font-size: 11px;">Cores</th>
+ <th style="text-align: right; color: #8b949e; padding: 6px 8px; border-bottom: 1px solid #30363d; font-weight: 600; text-transform: uppercase; letter-spacing: 0.5px; font-size: 11px;">Timeout</th>
+ <th style="text-align: right; color: #8b949e; padding: 6px 8px; border-bottom: 1px solid #30363d; font-weight: 600; text-transform: uppercase; letter-spacing: 0.5px; font-size: 11px;">Functions</th>
+ <th style="text-align: left; color: #8b949e; padding: 6px 8px; border-bottom: 1px solid #30363d; font-weight: 600; text-transform: uppercase; letter-spacing: 0.5px; font-size: 11px;">Worker ID</th>
+ </tr>
+ </thead>
+ <tbody id="worker-table-body"></tbody>
+ </table>
+ <div id="worker-detail" class="worker-detail" style="display: none;"></div>
+ </div>
+ </div>
+
+ <!-- Action History -->
+ <div class="section-title">Recent Actions</div>
+ <div class="card" style="margin-bottom: 30px;">
+ <div class="card-title">Action History</div>
+ <div id="action-history-empty" style="color: #6e7681; font-size: 13px;">No actions recorded yet.</div>
+ <div id="action-history-container" style="display: none;">
+ <table id="action-history-table" style="width: 100%; border-collapse: collapse; font-size: 13px;">
+ <thead>
+ <tr>
+ <th style="text-align: right; color: #8b949e; padding: 6px 8px; border-bottom: 1px solid #30363d; font-weight: 600; text-transform: uppercase; letter-spacing: 0.5px; font-size: 11px; width: 60px;">LSN</th>
+ <th style="text-align: center; color: #8b949e; padding: 6px 8px; border-bottom: 1px solid #30363d; font-weight: 600; text-transform: uppercase; letter-spacing: 0.5px; font-size: 11px; width: 70px;">Status</th>
+ <th style="text-align: left; color: #8b949e; padding: 6px 8px; border-bottom: 1px solid #30363d; font-weight: 600; text-transform: uppercase; letter-spacing: 0.5px; font-size: 11px;">Function</th>
+ <th style="text-align: right; color: #8b949e; padding: 6px 8px; border-bottom: 1px solid #30363d; font-weight: 600; text-transform: uppercase; letter-spacing: 0.5px; font-size: 11px; width: 80px;">Started</th>
+ <th style="text-align: right; color: #8b949e; padding: 6px 8px; border-bottom: 1px solid #30363d; font-weight: 600; text-transform: uppercase; letter-spacing: 0.5px; font-size: 11px; width: 80px;">Finished</th>
+ <th style="text-align: right; color: #8b949e; padding: 6px 8px; border-bottom: 1px solid #30363d; font-weight: 600; text-transform: uppercase; letter-spacing: 0.5px; font-size: 11px; width: 80px;">Duration</th>
+ <th style="text-align: left; color: #8b949e; padding: 6px 8px; border-bottom: 1px solid #30363d; font-weight: 600; text-transform: uppercase; letter-spacing: 0.5px; font-size: 11px;">Worker ID</th>
+ <th style="text-align: left; color: #8b949e; padding: 6px 8px; border-bottom: 1px solid #30363d; font-weight: 600; text-transform: uppercase; letter-spacing: 0.5px; font-size: 11px;">Action ID</th>
+ </tr>
+ </thead>
+ <tbody id="action-history-body"></tbody>
+ </table>
+ </div>
+ </div>
+
+ <!-- System Resources -->
+ <div class="section-title">System Resources</div>
+ <div class="grid">
+ <div class="card">
+ <div class="card-title">CPU Usage</div>
+ <div class="metric-value" id="cpu-usage">-</div>
+ <div class="metric-label">Percent</div>
+ <div class="progress-bar">
+ <div class="progress-fill" id="cpu-progress" style="width: 0%"></div>
+ </div>
+ <div style="position: relative; height: 60px; margin-top: 12px;">
+ <canvas id="cpu-chart"></canvas>
+ </div>
+ <div style="margin-top: 12px;">
+ <div class="stats-row">
+ <span class="stats-label">Packages</span>
+ <span class="stats-value" id="cpu-packages">-</span>
+ </div>
+ <div class="stats-row">
+ <span class="stats-label">Physical Cores</span>
+ <span class="stats-value" id="cpu-cores">-</span>
+ </div>
+ <div class="stats-row">
+ <span class="stats-label">Logical Processors</span>
+ <span class="stats-value" id="cpu-lp">-</span>
+ </div>
+ </div>
+ </div>
+ <div class="card">
+ <div class="card-title">Memory</div>
+ <div class="stats-row">
+ <span class="stats-label">Used</span>
+ <span class="stats-value" id="memory-used">-</span>
+ </div>
+ <div class="stats-row">
+ <span class="stats-label">Total</span>
+ <span class="stats-value" id="memory-total">-</span>
+ </div>
+ <div class="progress-bar">
+ <div class="progress-fill" id="memory-progress" style="width: 0%"></div>
+ </div>
+ </div>
+ <div class="card">
+ <div class="card-title">Disk</div>
+ <div class="stats-row">
+ <span class="stats-label">Used</span>
+ <span class="stats-value" id="disk-used">-</span>
+ </div>
+ <div class="stats-row">
+ <span class="stats-label">Total</span>
+ <span class="stats-value" id="disk-total">-</span>
+ </div>
+ <div class="progress-bar">
+ <div class="progress-fill" id="disk-progress" style="width: 0%"></div>
+ </div>
+ </div>
+ </div>
+ </div>
+
+ <script>
+ // Configuration
+ const BASE_URL = window.location.origin;
+ const REFRESH_INTERVAL = 2000; // 2 seconds
+ const MAX_HISTORY_POINTS = 60; // Show last 2 minutes
+
+ // Data storage
+ const history = {
+ timestamps: [],
+ pending: [],
+ running: [],
+ completed: [],
+ cpu: []
+ };
+
+ // CPU sparkline chart
+ const cpuCtx = document.getElementById('cpu-chart').getContext('2d');
+ const cpuChart = new Chart(cpuCtx, {
+ type: 'line',
+ data: {
+ labels: [],
+ datasets: [{
+ data: [],
+ borderColor: '#58a6ff',
+ backgroundColor: 'rgba(88, 166, 255, 0.15)',
+ borderWidth: 1.5,
+ tension: 0.4,
+ fill: true,
+ pointRadius: 0
+ }]
+ },
+ options: {
+ responsive: true,
+ maintainAspectRatio: false,
+ animation: false,
+ plugins: { legend: { display: false }, tooltip: { enabled: false } },
+ scales: {
+ x: { display: false },
+ y: { display: false, min: 0, max: 100 }
+ }
+ }
+ });
+
+ // Queue chart setup
+ const ctx = document.getElementById('queue-chart').getContext('2d');
+ const chart = new Chart(ctx, {
+ type: 'line',
+ data: {
+ labels: [],
+ datasets: [
+ {
+ label: 'Pending',
+ data: [],
+ borderColor: '#f0883e',
+ backgroundColor: 'rgba(240, 136, 62, 0.1)',
+ tension: 0.4,
+ fill: true
+ },
+ {
+ label: 'Running',
+ data: [],
+ borderColor: '#58a6ff',
+ backgroundColor: 'rgba(88, 166, 255, 0.1)',
+ tension: 0.4,
+ fill: true
+ },
+ {
+ label: 'Completed',
+ data: [],
+ borderColor: '#3fb950',
+ backgroundColor: 'rgba(63, 185, 80, 0.1)',
+ tension: 0.4,
+ fill: true
+ }
+ ]
+ },
+ options: {
+ responsive: true,
+ maintainAspectRatio: false,
+ plugins: {
+ legend: {
+ display: true,
+ labels: {
+ color: '#8b949e'
+ }
+ }
+ },
+ scales: {
+ x: {
+ display: false
+ },
+ y: {
+ beginAtZero: true,
+ ticks: {
+ color: '#8b949e'
+ },
+ grid: {
+ color: '#21262d'
+ }
+ }
+ }
+ }
+ });
+
+ // Helper functions
+ function formatBytes(bytes) {
+ if (bytes === 0) return '0 B';
+ const k = 1024;
+ const sizes = ['B', 'KB', 'MB', 'GB', 'TB'];
+ const i = Math.floor(Math.log(bytes) / Math.log(k));
+ return parseFloat((bytes / Math.pow(k, i)).toFixed(2)) + ' ' + sizes[i];
+ }
+
+ function formatRate(rate) {
+ return rate.toFixed(2) + '/s';
+ }
+
+ function showError(message) {
+ const container = document.getElementById('error-container');
+ container.innerHTML = `<div class="error">Error: ${message}</div>`;
+ }
+
+ function clearError() {
+ document.getElementById('error-container').innerHTML = '';
+ }
+
+ function updateTimestamp() {
+ const now = new Date();
+ document.getElementById('last-update').textContent = now.toLocaleTimeString();
+ }
+
+ // Fetch functions
+ async function fetchJSON(endpoint) {
+ const response = await fetch(`${BASE_URL}${endpoint}`, {
+ headers: {
+ 'Accept': 'application/json'
+ }
+ });
+ if (!response.ok) {
+ throw new Error(`HTTP ${response.status}: ${response.statusText}`);
+ }
+ return await response.json();
+ }
+
+ async function fetchHealth() {
+ try {
+ const response = await fetch(`${BASE_URL}/apply/ready`);
+ const isHealthy = response.status === 200;
+
+ const indicator = document.getElementById('health-indicator');
+ const text = document.getElementById('health-text');
+
+ if (isHealthy) {
+ indicator.classList.add('healthy');
+ indicator.classList.remove('unhealthy');
+ text.textContent = 'Healthy';
+ } else {
+ indicator.classList.add('unhealthy');
+ indicator.classList.remove('healthy');
+ text.textContent = 'Unhealthy';
+ }
+
+ return isHealthy;
+ } catch (error) {
+ const indicator = document.getElementById('health-indicator');
+ const text = document.getElementById('health-text');
+ indicator.classList.add('unhealthy');
+ indicator.classList.remove('healthy');
+ text.textContent = 'Error';
+ throw error;
+ }
+ }
+
+ async function fetchStats() {
+ const data = await fetchJSON('/stats/apply');
+
+ // Update action counts
+ document.getElementById('actions-pending').textContent = data.actions_pending || 0;
+ document.getElementById('actions-running').textContent = data.actions_submitted || 0;
+ document.getElementById('actions-complete').textContent = data.actions_complete || 0;
+
+ // Update completion rates
+ if (data.actions_retired) {
+ document.getElementById('rate-1').textContent = formatRate(data.actions_retired.rate_1 || 0);
+ document.getElementById('rate-5').textContent = formatRate(data.actions_retired.rate_5 || 0);
+ document.getElementById('rate-15').textContent = formatRate(data.actions_retired.rate_15 || 0);
+ document.getElementById('retired-count').textContent = data.actions_retired.count || 0;
+ document.getElementById('rate-mean').textContent = formatRate(data.actions_retired.rate_mean || 0);
+ }
+
+ // Update chart
+ const now = new Date().toLocaleTimeString();
+ history.timestamps.push(now);
+ history.pending.push(data.actions_pending || 0);
+ history.running.push(data.actions_submitted || 0);
+ history.completed.push(data.actions_complete || 0);
+
+ // Keep only last N points
+ if (history.timestamps.length > MAX_HISTORY_POINTS) {
+ history.timestamps.shift();
+ history.pending.shift();
+ history.running.shift();
+ history.completed.shift();
+ }
+
+ chart.data.labels = history.timestamps;
+ chart.data.datasets[0].data = history.pending;
+ chart.data.datasets[1].data = history.running;
+ chart.data.datasets[2].data = history.completed;
+ chart.update('none');
+ }
+
+ async function fetchSysInfo() {
+ const data = await fetchJSON('/apply/sysinfo');
+
+ // Update CPU
+ const cpuUsage = data.cpu_usage || 0;
+ document.getElementById('cpu-usage').textContent = cpuUsage.toFixed(1) + '%';
+ document.getElementById('cpu-progress').style.width = cpuUsage + '%';
+
+ history.cpu.push(cpuUsage);
+ if (history.cpu.length > MAX_HISTORY_POINTS) history.cpu.shift();
+ cpuChart.data.labels = history.cpu.map(() => '');
+ cpuChart.data.datasets[0].data = history.cpu;
+ cpuChart.update('none');
+
+ document.getElementById('cpu-packages').textContent = data.cpu_count ?? '-';
+ document.getElementById('cpu-cores').textContent = data.core_count ?? '-';
+ document.getElementById('cpu-lp').textContent = data.lp_count ?? '-';
+
+ // Update Memory
+ const memUsed = data.memory_used || 0;
+ const memTotal = data.memory_total || 1;
+ const memPercent = (memUsed / memTotal) * 100;
+ document.getElementById('memory-used').textContent = formatBytes(memUsed);
+ document.getElementById('memory-total').textContent = formatBytes(memTotal);
+ document.getElementById('memory-progress').style.width = memPercent + '%';
+
+ // Update Disk
+ const diskUsed = data.disk_used || 0;
+ const diskTotal = data.disk_total || 1;
+ const diskPercent = (diskUsed / diskTotal) * 100;
+ document.getElementById('disk-used').textContent = formatBytes(diskUsed);
+ document.getElementById('disk-total').textContent = formatBytes(diskTotal);
+ document.getElementById('disk-progress').style.width = diskPercent + '%';
+ }
+
+ // Persists the selected worker ID across refreshes
+ let selectedWorkerId = null;
+
+ function renderWorkerDetail(id, desc) {
+ const panel = document.getElementById('worker-detail');
+
+ if (!desc) {
+ panel.style.display = 'none';
+ return;
+ }
+
+ function field(label, value) {
+ return `<tr><td>${label}</td><td>${value ?? '-'}</td></tr>`;
+ }
+
+ function monoField(label, value) {
+ return `<tr><td>${label}</td><td class="detail-mono">${value ?? '-'}</td></tr>`;
+ }
+
+ // Functions
+ const functions = desc.functions || [];
+ const functionsHtml = functions.length === 0 ? '<span style="color:#6e7681;font-size:12px;">none</span>' :
+ `<table class="detail-table">${functions.map(f =>
+ `<tr><td>${f.name || '-'}</td><td class="detail-mono">${f.version || '-'}</td></tr>`
+ ).join('')}</table>`;
+
+ // Executables
+ const executables = desc.executables || [];
+ const totalExecSize = executables.reduce((sum, e) => sum + (e.size || 0), 0);
+ const execHtml = executables.length === 0 ? '<span style="color:#6e7681;font-size:12px;">none</span>' :
+ `<table class="detail-table">
+ <tr style="font-size:11px;">
+ <td style="color:#6e7681;padding-bottom:4px;">Path</td>
+ <td style="color:#6e7681;padding-bottom:4px;">Hash</td>
+ <td style="color:#6e7681;padding-bottom:4px;text-align:right;">Size</td>
+ </tr>
+ ${executables.map(e =>
+ `<tr>
+ <td>${e.name || '-'}</td>
+ <td class="detail-mono">${e.hash || '-'}</td>
+ <td style="text-align:right;white-space:nowrap;">${e.size != null ? formatBytes(e.size) : '-'}</td>
+ </tr>`
+ ).join('')}
+ <tr style="border-top:1px solid #30363d;">
+ <td style="color:#8b949e;padding-top:6px;">Total</td>
+ <td></td>
+ <td style="text-align:right;white-space:nowrap;padding-top:6px;color:#f0f6fc;font-weight:600;">${formatBytes(totalExecSize)}</td>
+ </tr>
+ </table>`;
+
+ // Files
+ const files = desc.files || [];
+ const filesHtml = files.length === 0 ? '<span style="color:#6e7681;font-size:12px;">none</span>' :
+ `<table class="detail-table">${files.map(f =>
+ `<tr><td>${f.name || f}</td><td class="detail-mono">${f.hash || ''}</td></tr>`
+ ).join('')}</table>`;
+
+ // Dirs
+ const dirs = desc.dirs || [];
+ const dirsHtml = dirs.length === 0 ? '<span style="color:#6e7681;font-size:12px;">none</span>' :
+ dirs.map(d => `<span class="detail-tag">${d}</span>`).join('');
+
+ // Environment
+ const env = desc.environment || [];
+ const envHtml = env.length === 0 ? '<span style="color:#6e7681;font-size:12px;">none</span>' :
+ env.map(e => `<span class="detail-tag">${e}</span>`).join('');
+
+ panel.innerHTML = `
+ <div class="worker-detail-title">${desc.name || id}</div>
+ <div class="detail-section">
+ <table class="detail-table">
+ ${field('Worker ID', `<span class="detail-mono">${id}</span>`)}
+ ${field('Path', desc.path)}
+ ${field('Platform', desc.host)}
+ ${monoField('Build System', desc.buildsystem_version)}
+ ${field('Cores', desc.cores)}
+ ${field('Timeout', desc.timeout != null ? desc.timeout + 's' : null)}
+ </table>
+ </div>
+ <div class="detail-section">
+ <div class="detail-section-label">Functions</div>
+ ${functionsHtml}
+ </div>
+ <div class="detail-section">
+ <div class="detail-section-label">Executables</div>
+ ${execHtml}
+ </div>
+ <div class="detail-section">
+ <div class="detail-section-label">Files</div>
+ ${filesHtml}
+ </div>
+ <div class="detail-section">
+ <div class="detail-section-label">Directories</div>
+ ${dirsHtml}
+ </div>
+ <div class="detail-section">
+ <div class="detail-section-label">Environment</div>
+ ${envHtml}
+ </div>
+ `;
+ panel.style.display = 'block';
+ }
+
+ async function fetchWorkers() {
+ const data = await fetchJSON('/apply/workers');
+ const workerIds = data.workers || [];
+
+ document.getElementById('worker-count').textContent = workerIds.length;
+
+ const container = document.getElementById('worker-table-container');
+ const tbody = document.getElementById('worker-table-body');
+
+ if (workerIds.length === 0) {
+ container.style.display = 'none';
+ selectedWorkerId = null;
+ return;
+ }
+
+ const descriptors = await Promise.all(
+ workerIds.map(id => fetchJSON(`/apply/workers/${id}`).catch(() => null))
+ );
+
+ // Build a map for quick lookup by ID
+ const descriptorMap = {};
+ workerIds.forEach((id, i) => { descriptorMap[id] = descriptors[i]; });
+
+ tbody.innerHTML = '';
+ descriptors.forEach((desc, i) => {
+ const id = workerIds[i];
+ const name = desc ? (desc.name || '-') : '-';
+ const host = desc ? (desc.host || '-') : '-';
+ const cores = desc ? (desc.cores != null ? desc.cores : '-') : '-';
+ const timeout = desc ? (desc.timeout != null ? desc.timeout + 's' : '-') : '-';
+ const functions = desc ? (desc.functions ? desc.functions.length : 0) : '-';
+
+ const tr = document.createElement('tr');
+ tr.className = 'worker-row' + (id === selectedWorkerId ? ' selected' : '');
+ tr.dataset.workerId = id;
+ tr.innerHTML = `
+ <td style="padding: 6px 8px; color: #f0f6fc; border-bottom: 1px solid #21262d;">${name}</td>
+ <td style="padding: 6px 8px; color: #c9d1d9; border-bottom: 1px solid #21262d;">${host}</td>
+ <td style="padding: 6px 8px; color: #c9d1d9; border-bottom: 1px solid #21262d; text-align: right;">${cores}</td>
+ <td style="padding: 6px 8px; color: #c9d1d9; border-bottom: 1px solid #21262d; text-align: right;">${timeout}</td>
+ <td style="padding: 6px 8px; color: #c9d1d9; border-bottom: 1px solid #21262d; text-align: right;">${functions}</td>
+ <td style="padding: 6px 8px; color: #8b949e; border-bottom: 1px solid #21262d; font-family: monospace; font-size: 11px;">${id}</td>
+ `;
+ tr.addEventListener('click', () => {
+ document.querySelectorAll('.worker-row').forEach(r => r.classList.remove('selected'));
+ if (selectedWorkerId === id) {
+ // Toggle off
+ selectedWorkerId = null;
+ document.getElementById('worker-detail').style.display = 'none';
+ } else {
+ selectedWorkerId = id;
+ tr.classList.add('selected');
+ renderWorkerDetail(id, descriptorMap[id]);
+ }
+ });
+ tbody.appendChild(tr);
+ });
+
+ // Re-render detail if selected worker is still present
+ if (selectedWorkerId && descriptorMap[selectedWorkerId]) {
+ renderWorkerDetail(selectedWorkerId, descriptorMap[selectedWorkerId]);
+ } else if (selectedWorkerId && !descriptorMap[selectedWorkerId]) {
+ selectedWorkerId = null;
+ document.getElementById('worker-detail').style.display = 'none';
+ }
+
+ container.style.display = 'block';
+ }
+
+ // Windows FILETIME: 100ns ticks since 1601-01-01. Convert to JS Date.
+ const FILETIME_EPOCH_OFFSET_MS = 11644473600000n;
+ function filetimeToDate(ticks) {
+ if (!ticks) return null;
+ const ms = BigInt(ticks) / 10000n - FILETIME_EPOCH_OFFSET_MS;
+ return new Date(Number(ms));
+ }
+
+ function formatTime(date) {
+ if (!date) return '-';
+ return date.toLocaleTimeString([], { hour: '2-digit', minute: '2-digit', second: '2-digit' });
+ }
+
+ function formatDuration(startDate, endDate) {
+ if (!startDate || !endDate) return '-';
+ const ms = endDate - startDate;
+ if (ms < 0) return '-';
+ if (ms < 1000) return ms + ' ms';
+ if (ms < 60000) return (ms / 1000).toFixed(2) + ' s';
+ const m = Math.floor(ms / 60000);
+ const s = ((ms % 60000) / 1000).toFixed(0).padStart(2, '0');
+ return `${m}m ${s}s`;
+ }
+
+ async function fetchActionHistory() {
+ const data = await fetchJSON('/apply/jobs/history?limit=50');
+ const entries = data.history || [];
+
+ const empty = document.getElementById('action-history-empty');
+ const container = document.getElementById('action-history-container');
+ const tbody = document.getElementById('action-history-body');
+
+ if (entries.length === 0) {
+ empty.style.display = '';
+ container.style.display = 'none';
+ return;
+ }
+
+ empty.style.display = 'none';
+ tbody.innerHTML = '';
+
+ // Entries arrive oldest-first; reverse to show newest at top
+ for (const entry of [...entries].reverse()) {
+ const lsn = entry.lsn ?? '-';
+ const succeeded = entry.succeeded;
+ const badge = succeeded == null
+ ? '<span class="status-badge" style="background:#21262d;color:#8b949e;">unknown</span>'
+ : succeeded
+ ? '<span class="status-badge success">ok</span>'
+ : '<span class="status-badge failure">failed</span>';
+ const desc = entry.actionDescriptor || {};
+ const fn = desc.Function || '-';
+ const workerId = entry.workerId || '-';
+ const actionId = entry.actionId || '-';
+
+ const startDate = filetimeToDate(entry.time_Running);
+ const endDate = filetimeToDate(entry.time_Completed ?? entry.time_Failed);
+
+ const tr = document.createElement('tr');
+ tr.innerHTML = `
+ <td style="padding: 6px 8px; color: #8b949e; border-bottom: 1px solid #21262d; text-align: right; font-family: monospace;">${lsn}</td>
+ <td style="padding: 6px 8px; border-bottom: 1px solid #21262d; text-align: center;">${badge}</td>
+ <td style="padding: 6px 8px; color: #f0f6fc; border-bottom: 1px solid #21262d;">${fn}</td>
+ <td style="padding: 6px 8px; color: #8b949e; border-bottom: 1px solid #21262d; text-align: right; font-size: 12px; white-space: nowrap;">${formatTime(startDate)}</td>
+ <td style="padding: 6px 8px; color: #8b949e; border-bottom: 1px solid #21262d; text-align: right; font-size: 12px; white-space: nowrap;">${formatTime(endDate)}</td>
+ <td style="padding: 6px 8px; color: #c9d1d9; border-bottom: 1px solid #21262d; text-align: right; font-size: 12px; white-space: nowrap;">${formatDuration(startDate, endDate)}</td>
+ <td style="padding: 6px 8px; color: #8b949e; border-bottom: 1px solid #21262d; font-family: monospace; font-size: 11px;">${workerId}</td>
+ <td style="padding: 6px 8px; color: #8b949e; border-bottom: 1px solid #21262d; font-family: monospace; font-size: 11px;">${actionId}</td>
+ `;
+ tbody.appendChild(tr);
+ }
+
+ container.style.display = 'block';
+ }
+
+ async function updateDashboard() {
+ try {
+ await Promise.all([
+ fetchHealth(),
+ fetchStats(),
+ fetchSysInfo(),
+ fetchWorkers(),
+ fetchActionHistory()
+ ]);
+
+ clearError();
+ updateTimestamp();
+ } catch (error) {
+ console.error('Error updating dashboard:', error);
+ showError(error.message);
+ }
+ }
+
+ // Start updating
+ updateDashboard();
+ setInterval(updateDashboard, REFRESH_INTERVAL);
+ </script>
+</body>
+</html>
diff --git a/src/zenserver/main.cpp b/src/zenserver/main.cpp
index 1a929b026..ee783d2a6 100644
--- a/src/zenserver/main.cpp
+++ b/src/zenserver/main.cpp
@@ -23,6 +23,9 @@
#include <zenutil/service.h>
#include "diag/logging.h"
+
+#include "compute/computeserver.h"
+
#include "storage/storageconfig.h"
#include "storage/zenstorageserver.h"
@@ -61,11 +64,19 @@ namespace zen {
#if ZEN_PLATFORM_WINDOWS
-template<class T>
+/** Windows Service wrapper for Zen servers
+ *
+ * This class wraps a Zen server main entry point (the Main template parameter)
+ * into a Windows Service by implementing the WindowsService interface.
+ *
+ * The Main type needs to implement the virtual functions from the ZenServerMain
+ * base class, which provides the actual server logic.
+ */
+template<class Main>
class ZenWindowsService : public WindowsService
{
public:
- ZenWindowsService(typename T::Config& ServerOptions) : m_EntryPoint(ServerOptions) {}
+ ZenWindowsService(typename Main::Config& ServerOptions) : m_EntryPoint(ServerOptions) {}
ZenWindowsService(const ZenWindowsService&) = delete;
ZenWindowsService& operator=(const ZenWindowsService&) = delete;
@@ -73,7 +84,7 @@ public:
virtual int Run() override { return m_EntryPoint.Run(); }
private:
- T m_EntryPoint;
+ Main m_EntryPoint;
};
#endif // ZEN_PLATFORM_WINDOWS
@@ -84,6 +95,23 @@ private:
namespace zen {
+/** Application main entry point template
+ *
+ * This function handles common application startup tasks while allowing
+ * different server types to be plugged in via the Main template parameter.
+ *
+ * On Windows, this function also handles platform-specific service
+ * installation and uninstallation.
+ *
+ * The Main type needs to implement the virtual functions from the ZenServerMain
+ * base class, which provides the actual server logic.
+ *
+ * The Main type is also expected to provide the following members:
+ *
+ * typedef Config -- Server configuration type, derived from ZenServerConfig
+ * typedef Configurator -- Server configuration handler type, implements ZenServerConfiguratorBase
+ *
+ */
template<class Main>
int
AppMain(int argc, char* argv[])
@@ -241,7 +269,12 @@ main(int argc, char* argv[])
auto _ = zen::MakeGuard([] {
// Allow some time for worker threads to unravel, in an effort
- // to prevent shutdown races in TLS object destruction
+ // to prevent shutdown races in TLS object destruction, mainly due to
+ // threads which we don't directly control (Windows thread pool) and
+ // therefore can't join.
+ //
+ // This isn't a great solution, but for now it seems to help reduce
+ // shutdown crashes observed in some situations.
WaitForThreads(1000);
});
@@ -249,6 +282,7 @@ main(int argc, char* argv[])
{
kHub,
kStore,
+ kCompute,
kTest
} ServerMode = kStore;
@@ -258,10 +292,14 @@ main(int argc, char* argv[])
{
ServerMode = kHub;
}
- else if (argv[1] == "store"sv)
+ else if ((argv[1] == "store"sv) || (argv[1] == "storage"sv))
{
ServerMode = kStore;
}
+ else if (argv[1] == "compute"sv)
+ {
+ ServerMode = kCompute;
+ }
else if (argv[1] == "test"sv)
{
ServerMode = kTest;
@@ -280,6 +318,13 @@ main(int argc, char* argv[])
break;
case kHub:
return AppMain<ZenHubServerMain>(argc, argv);
+ case kCompute:
+#if ZEN_WITH_COMPUTE_SERVICES
+ return AppMain<ZenComputeServerMain>(argc, argv);
+#else
+ fprintf(stderr, "compute services are not compiled in!\n");
+ exit(5);
+#endif
default:
case kStore:
return AppMain<ZenStorageServerMain>(argc, argv);
diff --git a/src/zenserver/storage/storageconfig.cpp b/src/zenserver/storage/storageconfig.cpp
index 0f8ab1e98..089b6b572 100644
--- a/src/zenserver/storage/storageconfig.cpp
+++ b/src/zenserver/storage/storageconfig.cpp
@@ -797,6 +797,7 @@ ZenStorageServerCmdLineOptions::AddCacheOptions(cxxopts::Options& options, ZenSt
cxxopts::value<uint64_t>(ServerOptions.StructuredCacheConfig.MemMaxAgeSeconds)->default_value("86400"),
"");
+ options.add_option("compute", "", "lie-cpus", "Lie to upstream about CPU capabilities", cxxopts::value<int>(ServerOptions.LieCpu), "");
options.add_option("cache",
"",
"cache-bucket-maxblocksize",
diff --git a/src/zenserver/storage/storageconfig.h b/src/zenserver/storage/storageconfig.h
index d59d05cf6..b408b0c26 100644
--- a/src/zenserver/storage/storageconfig.h
+++ b/src/zenserver/storage/storageconfig.h
@@ -156,6 +156,7 @@ struct ZenStorageServerConfig : public ZenServerConfig
ZenWorkspacesConfig WorksSpacesConfig;
std::filesystem::path PluginsConfigFile; // Path to plugins config file
bool ObjectStoreEnabled = false;
+ bool ComputeEnabled = true;
std::string ScrubOptions;
};
diff --git a/src/zenserver/storage/zenstorageserver.cpp b/src/zenserver/storage/zenstorageserver.cpp
index 2b74395c3..ff854b72d 100644
--- a/src/zenserver/storage/zenstorageserver.cpp
+++ b/src/zenserver/storage/zenstorageserver.cpp
@@ -182,6 +182,13 @@ ZenStorageServer::RegisterServices()
#endif // ZEN_WITH_VFS
m_Http->RegisterService(*m_AdminService);
+
+#if ZEN_WITH_COMPUTE_SERVICES
+ if (m_HttpFunctionService)
+ {
+ m_Http->RegisterService(*m_HttpFunctionService);
+ }
+#endif
}
void
@@ -267,6 +274,16 @@ ZenStorageServer::InitializeServices(const ZenStorageServerConfig& ServerOptions
m_BuildStoreService = std::make_unique<HttpBuildStoreService>(m_StatusService, m_StatsService, *m_BuildStore);
}
+#if ZEN_WITH_COMPUTE_SERVICES
+ if (ServerOptions.ComputeEnabled)
+ {
+ ZEN_OTEL_SPAN("InitializeComputeService");
+
+ m_HttpFunctionService =
+ std::make_unique<compute::HttpFunctionService>(*m_CidStore, m_StatsService, ServerOptions.DataDir / "functions");
+ }
+#endif
+
#if ZEN_WITH_VFS
m_VfsServiceImpl = std::make_unique<VfsServiceImpl>();
m_VfsServiceImpl->AddService(Ref<ProjectStore>(m_ProjectStore));
@@ -805,6 +822,10 @@ ZenStorageServer::Cleanup()
Flush();
+#if ZEN_WITH_COMPUTE_SERVICES
+ m_HttpFunctionService.reset();
+#endif
+
m_AdminService.reset();
m_VfsService.reset();
m_VfsServiceImpl.reset();
diff --git a/src/zenserver/storage/zenstorageserver.h b/src/zenserver/storage/zenstorageserver.h
index 5ccb587d6..456447a2a 100644
--- a/src/zenserver/storage/zenstorageserver.h
+++ b/src/zenserver/storage/zenstorageserver.h
@@ -6,6 +6,7 @@
#include <zenhttp/auth/authmgr.h>
#include <zenhttp/auth/authservice.h>
+#include <zenhttp/httpapiservice.h>
#include <zenhttp/httptest.h>
#include <zenstore/cache/structuredcachestore.h>
#include <zenstore/gc.h>
@@ -23,6 +24,10 @@
#include "vfs/vfsservice.h"
#include "workspaces/httpworkspaces.h"
+#if ZEN_WITH_COMPUTE_SERVICES
+# include <zencompute/httpfunctionservice.h>
+#endif
+
namespace zen {
class ZenStorageServer : public ZenServerBase
@@ -34,11 +39,6 @@ public:
ZenStorageServer();
~ZenStorageServer();
- void SetDedicatedMode(bool State) { m_IsDedicatedMode = State; }
- void SetTestMode(bool State) { m_TestMode = State; }
- void SetDataRoot(std::filesystem::path Root) { m_DataRoot = Root; }
- void SetContentRoot(std::filesystem::path Root) { m_ContentRoot = Root; }
-
int Initialize(const ZenStorageServerConfig& ServerOptions, ZenServerState::ZenServerEntry* ServerEntry);
void Run();
void Cleanup();
@@ -48,14 +48,9 @@ private:
void InitializeStructuredCache(const ZenStorageServerConfig& ServerOptions);
void Flush();
- bool m_IsDedicatedMode = false;
- bool m_TestMode = false;
- bool m_DebugOptionForcedCrash = false;
- std::string m_StartupScrubOptions;
- CbObject m_RootManifest;
- std::filesystem::path m_DataRoot;
- std::filesystem::path m_ContentRoot;
- asio::steady_timer m_StateMarkerTimer{m_IoContext};
+ std::string m_StartupScrubOptions;
+ CbObject m_RootManifest;
+ asio::steady_timer m_StateMarkerTimer{m_IoContext};
void EnqueueStateMarkerTimer();
void CheckStateMarker();
@@ -95,6 +90,11 @@ private:
std::unique_ptr<HttpBuildStoreService> m_BuildStoreService;
std::unique_ptr<VfsService> m_VfsService;
std::unique_ptr<HttpAdminService> m_AdminService;
+ std::unique_ptr<HttpApiService> m_ApiService;
+
+#if ZEN_WITH_COMPUTE_SERVICES
+ std::unique_ptr<compute::HttpFunctionService> m_HttpFunctionService;
+#endif
};
struct ZenStorageServerConfigurator;
diff --git a/src/zenserver/xmake.lua b/src/zenserver/xmake.lua
index 6ee80dc62..9ab51beb2 100644
--- a/src/zenserver/xmake.lua
+++ b/src/zenserver/xmake.lua
@@ -2,7 +2,11 @@
target("zenserver")
set_kind("binary")
+ if enable_unity then
+ add_rules("c++.unity_build", {batchsize = 4})
+ end
add_deps("zencore",
+ "zencompute",
"zenhttp",
"zennet",
"zenremotestore",
diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp
index 7f9bf56a9..7bf6126df 100644
--- a/src/zenserver/zenserver.cpp
+++ b/src/zenserver/zenserver.cpp
@@ -18,6 +18,7 @@
#include <zencore/sentryintegration.h>
#include <zencore/session.h>
#include <zencore/string.h>
+#include <zencore/system.h>
#include <zencore/thread.h>
#include <zencore/timer.h>
#include <zencore/trace.h>
@@ -145,6 +146,13 @@ ZenServerBase::Initialize(const ZenServerConfig& ServerOptions, ZenServerState::
InitializeSecuritySettings(ServerOptions);
+ if (ServerOptions.LieCpu)
+ {
+ SetCpuCountForReporting(ServerOptions.LieCpu);
+
+ ZEN_INFO("Reporting concurrency: {}", ServerOptions.LieCpu);
+ }
+
m_StatusService.RegisterHandler("status", *this);
m_Http->RegisterService(m_StatusService);
diff --git a/src/zenserver/zenserver.h b/src/zenserver/zenserver.h
index efa46f361..5a8a079c0 100644
--- a/src/zenserver/zenserver.h
+++ b/src/zenserver/zenserver.h
@@ -43,6 +43,11 @@ public:
void SetIsReadyFunc(std::function<void()>&& IsReadyFunc) { m_IsReadyFunc = std::move(IsReadyFunc); }
+ void SetDataRoot(std::filesystem::path Root) { m_DataRoot = Root; }
+ void SetContentRoot(std::filesystem::path Root) { m_ContentRoot = Root; }
+ void SetDedicatedMode(bool State) { m_IsDedicatedMode = State; }
+ void SetTestMode(bool State) { m_TestMode = State; }
+
protected:
int Initialize(const ZenServerConfig& ServerOptions, ZenServerState::ZenServerEntry* ServerEntry);
void Finalize();
@@ -55,6 +60,10 @@ protected:
bool m_UseSentry = false;
bool m_IsPowerCycle = false;
+ bool m_IsDedicatedMode = false;
+ bool m_TestMode = false;
+ bool m_DebugOptionForcedCrash = false;
+
std::thread m_IoRunner;
asio::io_context m_IoContext;
void EnsureIoRunner();
@@ -72,6 +81,9 @@ protected:
std::function<void()> m_IsReadyFunc;
void OnReady();
+ std::filesystem::path m_DataRoot; // Root directory for server state
+ std::filesystem::path m_ContentRoot; // Root directory for frontend content
+
Ref<HttpServer> m_Http;
std::unique_ptr<IHttpRequestFilter> m_HttpRequestFilter;
@@ -114,7 +126,6 @@ protected:
private:
void InitializeSecuritySettings(const ZenServerConfig& ServerOptions);
};
-
class ZenServerMain
{
public:
diff --git a/src/zentest-appstub/xmake.lua b/src/zentest-appstub/xmake.lua
index 97615e322..db3ff2e2d 100644
--- a/src/zentest-appstub/xmake.lua
+++ b/src/zentest-appstub/xmake.lua
@@ -5,6 +5,9 @@ target("zentest-appstub")
set_group("tests")
add_headerfiles("**.h")
add_files("*.cpp")
+ add_deps("zencore")
+ add_packages("vcpkg::gsl-lite") -- this should ideally be propagated by the zencore dependency
+ add_packages("vcpkg::mimalloc")
if is_os("linux") then
add_syslinks("pthread")
diff --git a/src/zentest-appstub/zentest-appstub.cpp b/src/zentest-appstub/zentest-appstub.cpp
index 24cf21e97..926580d96 100644
--- a/src/zentest-appstub/zentest-appstub.cpp
+++ b/src/zentest-appstub/zentest-appstub.cpp
@@ -1,33 +1,408 @@
// Copyright Epic Games, Inc. All Rights Reserved.
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinarypackage.h>
+#include <zencore/compress.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/stream.h>
+
+#if ZEN_WITH_TESTS
+# define ZEN_TEST_WITH_RUNNER 1
+# include <zencore/testing.h>
+#endif
+
+#include <fmt/format.h>
+
#include <stdio.h>
+#include <algorithm>
#include <chrono>
#include <cstdlib>
#include <cstring>
+#include <filesystem>
+#include <string>
+#include <system_error>
#include <thread>
-using namespace std::chrono_literals;
+using namespace std::literals;
+using namespace zen;
+
+#if !defined(_MSC_VER)
+# define _strnicmp strncasecmp // TEMPORARY WORKAROUND - should not be using this
+#endif
+
+// Some basic functions to implement some test "compute" functions
+
+std::string
+Rot13Function(std::string_view InputString)
+{
+ std::string OutputString{InputString};
+
+ std::transform(OutputString.begin(),
+ OutputString.end(),
+ OutputString.begin(),
+ [](std::string::value_type c) -> std::string::value_type {
+ if (c >= 'a' && c <= 'z')
+ {
+ return 'a' + (c - 'a' + 13) % 26;
+ }
+ else if (c >= 'A' && c <= 'Z')
+ {
+ return 'A' + (c - 'A' + 13) % 26;
+ }
+ else
+ {
+ return c;
+ }
+ });
+
+ return OutputString;
+}
+
+std::string
+ReverseFunction(std::string_view InputString)
+{
+ std::string OutputString{InputString};
+ std::reverse(OutputString.begin(), OutputString.end());
+ return OutputString;
+}
+
+std::string
+IdentityFunction(std::string_view InputString)
+{
+ return std::string{InputString};
+}
+
+std::string
+NullFunction(std::string_view)
+{
+ return {};
+}
+
+zen::CbObject
+DescribeFunctions()
+{
+ CbObjectWriter Versions;
+ Versions << "BuildSystemVersion" << Guid::FromString("17fe280d-ccd8-4be8-a9d1-89c944a70969"sv);
+
+ Versions.BeginArray("Functions"sv);
+ Versions.BeginObject();
+ Versions << "Name"sv
+ << "Null"sv;
+ Versions << "Version"sv << Guid::FromString("00000000-0000-0000-0000-000000000000"sv);
+ Versions.EndObject();
+ Versions.BeginObject();
+ Versions << "Name"sv
+ << "Identity"sv;
+ Versions << "Version"sv << Guid::FromString("11111111-1111-1111-1111-111111111111"sv);
+ Versions.EndObject();
+ Versions.BeginObject();
+ Versions << "Name"sv
+ << "Rot13"sv;
+ Versions << "Version"sv << Guid::FromString("13131313-1313-1313-1313-131313131313"sv);
+ Versions.EndObject();
+ Versions.BeginObject();
+ Versions << "Name"sv
+ << "Reverse"sv;
+ Versions << "Version"sv << Guid::FromString("31313131-3131-3131-3131-313131313131"sv);
+ Versions.EndObject();
+ Versions.EndArray();
+
+ return Versions.Save();
+}
+
+struct ContentResolver
+{
+ std::filesystem::path InputsRoot;
+
+ CompressedBuffer ResolveChunk(IoHash Hash, uint64_t ExpectedSize)
+ {
+ std::filesystem::path ChunkPath = InputsRoot / Hash.ToHexString();
+ IoBuffer ChunkBuffer = IoBufferBuilder::MakeFromFile(ChunkPath);
+
+ IoHash RawHash;
+ uint64_t RawSize = 0;
+ CompressedBuffer AsCompressed = CompressedBuffer::FromCompressed(SharedBuffer(ChunkBuffer), RawHash, RawSize);
+
+ if (RawSize != ExpectedSize)
+ {
+ throw std::runtime_error(
+ fmt::format("chunk size mismatch - expected {}, got {} for '{}'", ExpectedSize, ChunkBuffer.Size(), ChunkPath));
+ }
+ if (RawHash != Hash)
+ {
+ throw std::runtime_error(fmt::format("chunk hash mismatch - expected {}, got {} for '{}'", Hash, RawHash, ChunkPath));
+ }
+
+ return AsCompressed;
+ }
+};
+
+zen::CbPackage
+ExecuteFunction(CbObject Action, ContentResolver ChunkResolver)
+{
+ auto Apply = [&](auto Func) {
+ zen::CbPackage Result;
+ auto Source = Action["Inputs"sv].AsObjectView()["Source"sv].AsObjectView();
+
+ IoHash InputRawHash = Source["RawHash"sv].AsHash();
+ uint64_t InputRawSize = Source["RawSize"sv].AsUInt64();
+
+ zen::CompressedBuffer InputData = ChunkResolver.ResolveChunk(InputRawHash, InputRawSize);
+ SharedBuffer Input = InputData.Decompress();
+
+ std::string Output = Func(std::string_view(static_cast<const char*>(Input.GetData()), Input.GetSize()));
+ zen::CompressedBuffer OutputData =
+ zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Output), OodleCompressor::Selkie, OodleCompressionLevel::HyperFast4);
+ IoHash OutputRawHash = OutputData.DecodeRawHash();
+
+ CbAttachment OutputAttachment(std::move(OutputData), OutputRawHash);
+
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("Values"sv);
+ Cbo.BeginObject();
+ Cbo << "Id" << Oid{1, 2, 3};
+ Cbo.AddAttachment("RawHash", OutputAttachment);
+ Cbo << "RawSize" << Output.size();
+ Cbo.EndObject();
+ Cbo.EndArray();
+
+ Result.SetObject(Cbo.Save());
+ Result.AddAttachment(std::move(OutputAttachment));
+ return Result;
+ };
+
+ std::string_view Function = Action["Function"sv].AsString();
+
+ if (Function == "Rot13"sv)
+ {
+ return Apply(Rot13Function);
+ }
+ else if (Function == "Reverse"sv)
+ {
+ return Apply(ReverseFunction);
+ }
+ else if (Function == "Identity"sv)
+ {
+ return Apply(IdentityFunction);
+ }
+ else if (Function == "Null"sv)
+ {
+ return Apply(NullFunction);
+ }
+ else
+ {
+ return {};
+ }
+}
+
+/* This implements a minimal application to help testing of process launch-related
+ functionality
+
+ It also mimics the DDC2 worker command line interface, so it may be used to
+ exercise compute infrastructure.
+ */
int
main(int argc, char* argv[])
{
int ExitCode = 0;
- for (int i = 0; i < argc; ++i)
+ try
{
- if (std::strncmp(argv[i], "-t=", 3) == 0)
+ std::filesystem::path BasePath = std::filesystem::current_path();
+ std::filesystem::path InputPath = std::filesystem::current_path() / "Inputs";
+ std::filesystem::path OutputPath = std::filesystem::current_path() / "Outputs";
+ std::filesystem::path VersionPath = std::filesystem::current_path() / "Versions";
+ std::vector<std::filesystem::path> ActionPaths;
+
+ /*
+ GetSwitchValues(TEXT("-B="), ActionPathPatterns);
+ GetSwitchValues(TEXT("-Build="), ActionPathPatterns);
+
+ GetSwitchValues(TEXT("-I="), InputDirectoryPaths);
+ GetSwitchValues(TEXT("-Input="), InputDirectoryPaths);
+
+ GetSwitchValues(TEXT("-O="), OutputDirectoryPaths);
+ GetSwitchValues(TEXT("-Output="), OutputDirectoryPaths);
+
+ GetSwitchValues(TEXT("-V="), VersionPaths);
+ GetSwitchValues(TEXT("-Version="), VersionPaths);
+ */
+
+ auto SplitArg = [](const char* Arg) -> std::string_view {
+ std::string_view ArgView{Arg};
+ if (auto SplitPos = ArgView.find_first_of('='); SplitPos != std::string_view::npos)
+ {
+ return ArgView.substr(SplitPos + 1);
+ }
+ else
+ {
+ return {};
+ }
+ };
+
+ auto ParseIntArg = [](std::string_view Arg) -> int {
+ int Rv = 0;
+ const auto Result = std::from_chars(Arg.data(), Arg.data() + Arg.size(), Rv);
+
+ if (Result.ec != std::errc{})
+ {
+ throw std::invalid_argument(fmt::format("bad argument (not an integer): {}", Arg).c_str());
+ }
+
+ return Rv;
+ };
+
+ for (int i = 1; i < argc; ++i)
+ {
+ std::string_view Arg = argv[i];
+
+ if (Arg.compare(0, 1, "-"))
+ {
+ continue;
+ }
+
+ if (std::strncmp(argv[i], "-t=", 3) == 0)
+ {
+ const int SleepTime = std::atoi(argv[i] + 3);
+
+ printf("[zentest] sleeping for %ds...\n", SleepTime);
+
+ std::this_thread::sleep_for(SleepTime * 1s);
+ }
+ else if (std::strncmp(argv[i], "-f=", 3) == 0)
+ {
+ // Force a "failure" process exit code to return to the invoker
+
+ // This may throw for invalid arguments, which makes this useful for
+ // testing exception handling
+ std::string_view ErrorArg = SplitArg(argv[i]);
+ ExitCode = ParseIntArg(ErrorArg);
+ }
+ else if ((_strnicmp(argv[i], "-input=", 7) == 0) || (_strnicmp(argv[i], "-i=", 3) == 0))
+ {
+ /* mimic DDC2
+
+ GetSwitchValues(TEXT("-I="), InputDirectoryPaths);
+ GetSwitchValues(TEXT("-Input="), InputDirectoryPaths);
+ */
+
+ std::string_view InputArg = SplitArg(argv[i]);
+ InputPath = InputArg;
+ }
+ else if ((_strnicmp(argv[i], "-output=", 8) == 0) || (_strnicmp(argv[i], "-o=", 3) == 0))
+ {
+ /* mimic DDC2 handling of where files storing output chunk files are directed
+
+ GetSwitchValues(TEXT("-O="), OutputDirectoryPaths);
+ GetSwitchValues(TEXT("-Output="), OutputDirectoryPaths);
+ */
+
+ std::string_view OutputArg = SplitArg(argv[i]);
+ OutputPath = OutputArg;
+ }
+ else if ((_strnicmp(argv[i], "-version=", 8) == 0) || (_strnicmp(argv[i], "-v=", 3) == 0))
+ {
+ /* mimic DDC2
+
+ GetSwitchValues(TEXT("-V="), VersionPaths);
+ GetSwitchValues(TEXT("-Version="), VersionPaths);
+ */
+
+ std::string_view VersionArg = SplitArg(argv[i]);
+ VersionPath = VersionArg;
+ }
+ else if ((_strnicmp(argv[i], "-build=", 7) == 0) || (_strnicmp(argv[i], "-b=", 3) == 0))
+ {
+ /* mimic DDC2
+
+ GetSwitchValues(TEXT("-B="), ActionPathPatterns);
+ GetSwitchValues(TEXT("-Build="), ActionPathPatterns);
+ */
+
+ std::string_view BuildActionArg = SplitArg(argv[i]);
+ std::filesystem::path ActionPath{BuildActionArg};
+ ActionPaths.push_back(ActionPath);
+
+ ExitCode = 0;
+ }
+ }
+
+ // Emit version information
+
+ if (!VersionPath.empty())
{
- const int SleepTime = std::atoi(argv[i] + 3);
+ CbObjectWriter Version;
+
+ Version << "BuildSystemVersion" << Guid::FromString("17fe280d-ccd8-4be8-a9d1-89c944a70969"sv);
+
+ Version.BeginArray("Functions");
+
+ Version.BeginObject();
+ Version << "Name"
+ << "Rot13"
+ << "Version" << Guid::FromString("13131313-1313-1313-1313-131313131313"sv);
+ Version.EndObject();
- printf("[zentest] sleeping for %ds...\n", SleepTime);
+ Version.BeginObject();
+ Version << "Name"
+ << "Reverse"
+ << "Version" << Guid::FromString("98765432-1000-0000-0000-000000000000"sv);
+ Version.EndObject();
- std::this_thread::sleep_for(SleepTime * 1s);
+ Version.BeginObject();
+ Version << "Name"
+ << "Identity"
+ << "Version" << Guid::FromString("11111111-1111-1111-1111-111111111111"sv);
+ Version.EndObject();
+
+ Version.BeginObject();
+ Version << "Name"
+ << "Null"
+ << "Version" << Guid::FromString("00000000-0000-0000-0000-000000000000"sv);
+ Version.EndObject();
+
+ Version.EndArray();
+ CbObject VersionObject = Version.Save();
+
+ BinaryWriter Writer;
+ zen::SaveCompactBinary(Writer, VersionObject);
+ zen::WriteFile(VersionPath, IoBufferBuilder::MakeFromMemory(Writer.GetView()));
}
- else if (std::strncmp(argv[i], "-f=", 3) == 0)
+
+ // Evaluate actions
+
+ ContentResolver Resolver;
+ Resolver.InputsRoot = InputPath;
+
+ for (std::filesystem::path ActionPath : ActionPaths)
{
- ExitCode = std::atoi(argv[i] + 3);
+ IoBuffer ActionDescBuffer = ReadFile(ActionPath).Flatten();
+ CbObject ActionDesc = LoadCompactBinaryObject(ActionDescBuffer);
+ CbPackage Result = ExecuteFunction(ActionDesc, Resolver);
+ CbObject ResultObject = Result.GetObject();
+
+ BinaryWriter Writer;
+ zen::SaveCompactBinary(Writer, ResultObject);
+ zen::WriteFile(ActionPath.replace_extension(".output"), IoBufferBuilder::MakeFromMemory(Writer.GetView()));
+
+ // Also marshal outputs
+
+ for (const auto& Attachment : Result.GetAttachments())
+ {
+ const CompositeBuffer& AttachmentBuffer = Attachment.AsCompressedBinary().GetCompressed();
+ zen::WriteFile(OutputPath / Attachment.GetHash().ToHexString(), AttachmentBuffer.Flatten().AsIoBuffer());
+ }
}
}
+ catch (std::exception& Ex)
+ {
+ printf("[zentest] exception caught in main: '%s'\n", Ex.what());
+
+ ExitCode = 99;
+ }
printf("[zentest] exiting with exit code: %d\n", ExitCode);