diff options
54 files changed, 8019 insertions, 100 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 1c3d3c970..9fa4fe031 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ - Improvement: Web UI shows both hard and soft package dependencies - Improvement: Web UI presents ops with only files without resorting to json representation - Improvement: Web UI offers a cook artifacts view to present cook dependencies +- Bugfix: If a corrupted block (or partial block) is downloaded, handle it gracefully and end the download instead of causing an assert ## 5.7.20 - Improvement: When validating cache records read from disk we now do a limited validation of the payload to reduce overhead diff --git a/docs/compute.md b/docs/compute.md new file mode 100644 index 000000000..417622f94 --- /dev/null +++ b/docs/compute.md @@ -0,0 +1,152 @@ +# DDC compute interface design documentation + +This is a work in progress + +## General architecture + +The Zen server compute interfaces implement a basic model for distributing compute processes. +Clients can implement [Functions](#functions) in [worker executables](#workers) and dispatch +[actions](#actions) to them via a message based interface. + +The API requires users to describe the actions and the workers explicitly fully up front and the +work is described and submitted as singular objects to the compute service. The model somewhat +resembles Lambda and other stateless compute services but is more tightly constrained to allow +for optimizations and to integrate tightly with the storage components in Zen server. + +This is in contrast with Unreal Build Accelerator in where the worker (remote process) +and the inputs are discovered on-the-fly as the worker progresses and inputs and results +are communicated via relatively high-frequency RPCs. + +### Actions + +An action is described by an action descriptor, which is a compact binary object which +contains a self-contained description of the inputs and the function which should be applied +to generate an output. + +#### Sample Action Descriptor + +``` +work item 4857714dee2383b50b2e7d72afd79848ab5d13f8 (2 attachments): +Function: CompileShaderJobs +FunctionVersion: '83027356-2cf7-41ca-aba5-c81ab0ff2129' +BuildSystemVersion: '17fe280d-ccd8-4be8-a9d1-89c944a70969' +Inputs: + Input: + RawHash: 0c01d9f19033256ca974fced523d1e15b27c1b0a + RawSize: 4482 + Virtual0: + RawHash: dd9bbcb8763badd2f015f94f8f6e360362e2bce0 + RawSize: 3334 +``` + +### Functions + +Functions are identified by a name, and a version specification. For +matching purposes there's also a build system version specification. +When workers are registered with the compute service, they are entered +into a table and as actions stream in the compute subsystem will try to +find a worker which implements the required function using the +`[Function,FunctionVersion,BuildSystemVersion]` tuple. In practice there +may be more than one matching worker and it's up to the compute service +to pick one. + +``` +=== Known functions =========================== +function version build system worker id +CompileShaderJobs 83027356-2cf7-41ca-aba5-c81ab0ff2129 17fe280d-ccd8-4be8-a9d1-89c944a70969 69cb9bb50e9600b5bd5e5ca4ba0f9187b118069a +``` + +### Workers + +A worker is an executable which accepts some command line options which are used to pass the +information required to execute an action. There are two modes, one legacy mode which is +file-based and a streaming mode. + +In the file-based mode the option is simply `-Build=<action file>` which points to an action +descriptor in compact binary format (see above). By convention, the referenced inputs are in a folder +named `Inputs` where any input blobs are stored as `CompressedBuffer`-format files named +after the `IoHash` of the uncompressed contents. + +In the streaming mode, the data is provided through a streaming socket interface instead +of using the file system. This eliminates process spawning overheads and enables intra-process +pipelining for greater efficiency. The streaming mode is not yet implemented fully. + +### Worker Descriptors + +Workers are declared by passing a worker descriptor to the compute service. The descriptor +contains information about which executable files are required to execute the worker and how +they need to be laid out. You can optionally also provide additional non-executable files to +go along with the executables. + +The descriptor also lists the functions implemented by the worker. Each function defines +a version which is used when matching actions (the function version is passed in as the +`FunctionVersion` in the action descriptor). + +Each worker links in a small set of common support code which is used to handle the +communication with the invoking program (the 'build system'). To be able to evolve this +interface, each worker also indicates the version of the build system using the +`BuildSystemVersion` attribute. + +#### Sample Worker Descriptor + +``` +worker 69cb9bb50e9600b5bd5e5ca4ba0f9187b118069a: +name: ShaderBuildWorker +path: Engine/Binaries/Win64/ShaderBuildWorker.exe +host: Win64 +buildsystem_version: '17fe280d-ccd8-4be8-a9d1-89c944a70969' +timeout: 300 +cores: 1 +environment: [] +executables: + - name: 'Engine/Binaries/Win64/ShaderBuildWorker-DerivedDataBuildWorker.dll' + hash: f4dbec80e549bae2916288f1b9428c2878d9ae7a + size: 166912 + - name: 'Engine/Binaries/Win64/ShaderBuildWorker-DerivedDataCache.dll' + hash: 8025d561ede05db19b235fc2ef290e2b029c1b8c + size: 4339200 + - name: Engine/Binaries/Win64/ShaderBuildWorker.exe + hash: b85862fca2ce04990470f27bae9ead7f31d9b27e + size: 60928 + - name: Engine/Binaries/Win64/ShaderBuildWorker.modules + hash: 7b05741a69a2ea607c5578668a8de50b04259668 + size: 3739 + - name: Engine/Binaries/Win64/ShaderBuildWorker.version + hash: 8fdfd9f825febf2191b555393e69b32a1d78c24f + size: 259 +files: [] +dirs: + - Engine/Binaries/Win64 +functions: + - name: CompileShaderJobs + version: '83027356-2cf7-41ca-aba5-c81ab0ff2129' +``` + +## API (WIP not final) + +The compute interfaces are currently exposed on the `/apply` endpoint but this +will be subject to change as we adapt the interfaces during development. The LSN +APIs below are intended to replace the action ID oriented APIs. + +The POST APIs typically involve a two-step dance where a descriptor is POSTed and +the service responds with a list of `needs` chunks (identified via `IoHash`) which +it does not have yet. The client can then follow up with a POST of a Compact Binary +Package containing the descriptor along with the needed chunks. + +`/apply/ready` - health check endpoint returns HTTP 200 OK or HTTP 503 + +`/apply/sysinfo` - system information endpoint + +`/apply/record/start`, `/apply/record/stop` - start/stop action recording + +`/apply/workers/{worker}` - GET/POST worker descriptors and payloads + +`/apply/jobs/completed` - GET list of completed actions + +`/apply/jobs/{lsn}` - GET completed action results from LSN, POST action cancellation by LSN, priority changes by LSN + +`/apply/jobs/{worker}/{action}` - GET completed action (job) results by action ID + +`/apply/jobs/{worker}` - GET pending/running jobs for worker, POST requests to schedule action as a job + +`/apply/jobs` - POST request to schedule action as a job diff --git a/src/zen/cmds/exec_cmd.cpp b/src/zen/cmds/exec_cmd.cpp new file mode 100644 index 000000000..2d9d0d12e --- /dev/null +++ b/src/zen/cmds/exec_cmd.cpp @@ -0,0 +1,654 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "exec_cmd.h" + +#include <zencompute/functionservice.h> +#include <zencompute/recordingreader.h> +#include <zencore/compactbinary.h> +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinaryfile.h> +#include <zencore/compactbinarypackage.h> +#include <zencore/compactbinaryvalue.h> +#include <zencore/compress.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/scopeguard.h> +#include <zencore/stream.h> +#include <zencore/string.h> +#include <zencore/timer.h> + +#include <EASTL/hash_map.h> +#include <EASTL/hash_set.h> +#include <EASTL/map.h> + +using namespace std::literals; + +namespace eastl { + +template<> +struct hash<zen::IoHash> : public zen::IoHash::Hasher +{ +}; + +} // namespace eastl + +#if ZEN_WITH_COMPUTE_SERVICES + +namespace zen { + +ExecCommand::ExecCommand() +{ + m_Options.add_options()("h,help", "Print help"); + m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName), "<hosturl>"); + m_Options.add_option("", "", "log", "Action log directory", cxxopts::value(m_RecordingLogPath), "<path>"); + m_Options.add_option("", "p", "path", "Recording path (directory or .actionlog file)", cxxopts::value(m_RecordingPath), "<path>"); + m_Options.add_option("", "", "offset", "Recording replay start offset", cxxopts::value(m_Offset), "<offset>"); + m_Options.add_option("", "", "stride", "Recording replay stride", cxxopts::value(m_Stride), "<stride>"); + m_Options.add_option("", "", "limit", "Recording replay limit", cxxopts::value(m_Limit), "<limit>"); + m_Options.add_option("", "", "beacon", "Beacon path", cxxopts::value(m_BeaconPath), "<path>"); + m_Options.add_option("", + "", + "mode", + "Select execution mode (http,inproc,dump,direct,beacon,buildlog)", + cxxopts::value(m_Mode)->default_value("http"), + "<string>"); + m_Options.add_option("", "", "quiet", "Quiet mode (less logging)", cxxopts::value(m_Quiet), "<bool>"); + m_Options.parse_positional("mode"); +} + +ExecCommand::~ExecCommand() +{ +} + +void +ExecCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + // Configure + + if (!ParseOptions(argc, argv)) + { + return; + } + + m_HostName = ResolveTargetHostSpec(m_HostName); + + if (m_RecordingPath.empty()) + { + throw OptionParseException("replay path is required!", m_Options.help()); + } + + m_VerboseLogging = GlobalOptions.IsVerbose; + m_QuietLogging = m_Quiet && !m_VerboseLogging; + + enum ExecMode + { + kHttp, + kDirect, + kInproc, + kDump, + kBeacon, + kBuildLog + } Mode; + + if (m_Mode == "http"sv) + { + Mode = kHttp; + } + else if (m_Mode == "direct"sv) + { + Mode = kDirect; + } + else if (m_Mode == "inproc"sv) + { + Mode = kInproc; + } + else if (m_Mode == "dump"sv) + { + Mode = kDump; + } + else if (m_Mode == "beacon"sv) + { + Mode = kBeacon; + } + else if (m_Mode == "buildlog"sv) + { + Mode = kBuildLog; + } + else + { + throw OptionParseException("invalid mode specified!", m_Options.help()); + } + + // Gather information from recording path + + std::unique_ptr<zen::compute::RecordingReader> Reader; + std::unique_ptr<zen::compute::UeRecordingReader> UeReader; + + std::filesystem::path RecordingPath{m_RecordingPath}; + + if (!std::filesystem::is_directory(RecordingPath)) + { + throw OptionParseException("replay path should be a directory path!", m_Options.help()); + } + else + { + if (std::filesystem::is_directory(RecordingPath / "cid")) + { + Reader = std::make_unique<zen::compute::RecordingReader>(RecordingPath); + m_WorkerMap = Reader->ReadWorkers(); + m_ChunkResolver = Reader.get(); + m_RecordingReader = Reader.get(); + } + else + { + UeReader = std::make_unique<zen::compute::UeRecordingReader>(RecordingPath); + m_WorkerMap = UeReader->ReadWorkers(); + m_ChunkResolver = UeReader.get(); + m_RecordingReader = UeReader.get(); + } + } + + ZEN_CONSOLE("found {} workers, {} action items", m_WorkerMap.size(), m_RecordingReader->GetActionCount()); + + for (auto& Kv : m_WorkerMap) + { + CbObject WorkerDesc = Kv.second.GetObject(); + const IoHash& WorkerId = Kv.first; + + RegisterWorkerFunctionsFromDescription(WorkerDesc, WorkerId); + + if (m_VerboseLogging) + { + zen::ExtendableStringBuilder<1024> ObjStr; +# if 0 + zen::CompactBinaryToJson(WorkerDesc, ObjStr); + ZEN_CONSOLE("worker {}: {}", WorkerId, ObjStr); +# else + zen::CompactBinaryToYaml(WorkerDesc, ObjStr); + ZEN_CONSOLE("worker {}:\n{}", WorkerId, ObjStr); +# endif + } + } + + if (m_VerboseLogging) + { + EmitFunctionList(m_FunctionList); + } + + // Iterate over work items and dispatch or log them + + int ReturnValue = 0; + + Stopwatch ExecTimer; + + switch (Mode) + { + case kHttp: + // Forward requests to HTTP function service + ReturnValue = HttpExecute(); + break; + + case kDirect: + // Not currently supported + ReturnValue = LocalMessagingExecute(); + break; + + case kInproc: + // Handle execution in-core (by spawning child processes) + ReturnValue = InProcessExecute(); + break; + + case kDump: + // Dump high level information about actions to console + ReturnValue = DumpWorkItems(); + break; + + case kBeacon: + ReturnValue = BeaconExecute(); + break; + + case kBuildLog: + ReturnValue = BuildActionsLog(); + break; + + default: + ZEN_ERROR("Unknown operating mode! No work submitted"); + + ReturnValue = 1; + } + + ZEN_CONSOLE("complete - took {}", NiceTimeSpanMs(ExecTimer.GetElapsedTimeMs())); + + if (!ReturnValue) + { + ZEN_CONSOLE("all work items completed successfully"); + } + else + { + ZEN_CONSOLE("some work items failed (code {})", ReturnValue); + } +} + +int +ExecCommand::InProcessExecute() +{ + ZEN_ASSERT(m_ChunkResolver); + ChunkResolver& Resolver = *m_ChunkResolver; + + zen::compute::FunctionServiceSession FunctionSession(Resolver); + + std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp"); + FunctionSession.AddLocalRunner(Resolver, TempPath); + + return ExecUsingSession(FunctionSession); +} + +int +ExecCommand::ExecUsingSession(zen::compute::FunctionServiceSession& FunctionSession) +{ + struct JobTracker + { + public: + inline void Insert(int LsnField) + { + RwLock::ExclusiveLockScope _(Lock); + PendingJobs.insert(LsnField); + } + + inline bool IsEmpty() const + { + RwLock::SharedLockScope _(Lock); + return PendingJobs.empty(); + } + + inline void Remove(int CompleteLsn) + { + RwLock::ExclusiveLockScope _(Lock); + PendingJobs.erase(CompleteLsn); + } + + inline size_t GetSize() const + { + RwLock::SharedLockScope _(Lock); + return PendingJobs.size(); + } + + private: + mutable RwLock Lock; + std::unordered_set<int> PendingJobs; + }; + + JobTracker PendingJobs; + + std::atomic<int> IsDraining{0}; + + auto DrainCompletedJobs = [&] { + if (IsDraining.exchange(1)) + { + return; + } + + auto _ = MakeGuard([&] { IsDraining.store(0, std::memory_order_release); }); + + CbObjectWriter Cbo; + FunctionSession.GetCompleted(Cbo); + + if (CbObject Completed = Cbo.Save()) + { + for (auto& It : Completed["completed"sv]) + { + int32_t CompleteLsn = It.AsInt32(); + + CbPackage ResultPackage; + HttpResponseCode Response = FunctionSession.GetActionResult(CompleteLsn, /* out */ ResultPackage); + + if (Response == HttpResponseCode::OK) + { + PendingJobs.Remove(CompleteLsn); + + ZEN_CONSOLE("completed: LSN {} ({} still pending)", CompleteLsn, PendingJobs.GetSize()); + } + } + } + }; + + // Describe workers + + ZEN_CONSOLE("describing {} workers", m_WorkerMap.size()); + + for (auto Kv : m_WorkerMap) + { + CbPackage WorkerDesc = Kv.second; + + FunctionSession.RegisterWorker(WorkerDesc); + } + + // Then submit work items + + int FailedWorkCounter = 0; + size_t RemainingWorkItems = m_RecordingReader->GetActionCount(); + int SubmittedWorkItems = 0; + + ZEN_CONSOLE("submitting {} work items", RemainingWorkItems); + + int OffsetCounter = m_Offset; + int StrideCounter = m_Stride; + + auto ShouldSchedule = [&]() -> bool { + if (m_Limit && SubmittedWorkItems >= m_Limit) + { + // Limit reached, ignore + + return false; + } + + if (OffsetCounter && OffsetCounter--) + { + // Still in offset, ignore + + return false; + } + + if (--StrideCounter == 0) + { + StrideCounter = m_Stride; + + return true; + } + + return false; + }; + + m_RecordingReader->IterateActions( + [&](CbObject ActionObject, const IoHash& ActionId) { + // Enqueue job + + Stopwatch SubmitTimer; + + const int Priority = 0; + + if (ShouldSchedule()) + { + if (m_VerboseLogging) + { + int AttachmentCount = 0; + uint64_t AttachmentBytes = 0; + eastl::hash_set<IoHash> ReferencedChunks; + + ActionObject.IterateAttachments([&](CbFieldView Field) { + IoHash AttachData = Field.AsAttachment(); + + ReferencedChunks.insert(AttachData); + ++AttachmentCount; + + if (IoBuffer ChunkData = m_ChunkResolver->FindChunkByCid(AttachData)) + { + AttachmentBytes += ChunkData.GetSize(); + } + }); + + zen::ExtendableStringBuilder<1024> ObjStr; + zen::CompactBinaryToJson(ActionObject, ObjStr); + ZEN_CONSOLE("work item {} ({} attachments, {} bytes): {}", + ActionId, + AttachmentCount, + NiceBytes(AttachmentBytes), + ObjStr); + } + + if (zen::compute::FunctionServiceSession::EnqueueResult EnqueueResult = + FunctionSession.EnqueueAction(ActionObject, Priority)) + { + const int32_t LsnField = EnqueueResult.Lsn; + + --RemainingWorkItems; + ++SubmittedWorkItems; + + if (!m_QuietLogging) + { + ZEN_CONSOLE("submitted work item #{} - LSN {} - {}. {} remaining", + SubmittedWorkItems, + LsnField, + NiceTimeSpanMs(SubmitTimer.GetElapsedTimeMs()), + RemainingWorkItems); + } + + PendingJobs.Insert(LsnField); + } + else + { + if (!m_QuietLogging) + { + std::string_view FunctionName = ActionObject["Function"sv].AsString(); + const Guid FunctionVersion = ActionObject["FunctionVersion"sv].AsUuid(); + const Guid BuildSystemVersion = ActionObject["BuildSystemVersion"sv].AsUuid(); + + ZEN_ERROR( + "failed to resolve function for work with (Function:{},FunctionVersion:{},BuildSystemVersion:{}). Work " + "descriptor " + "at: 'file://{}'", + std::string(FunctionName), + FunctionVersion, + BuildSystemVersion, + "<null>"); + + EmitFunctionListOnce(m_FunctionList); + } + + ++FailedWorkCounter; + } + } + + // Check for completed work + + DrainCompletedJobs(); + }, + 8); + + // Wait until all pending work is complete + + while (!PendingJobs.IsEmpty()) + { + // TODO: improve this logic + zen::Sleep(500); + + DrainCompletedJobs(); + } + + if (FailedWorkCounter) + { + return 1; + } + + return 0; +} + +int +ExecCommand::LocalMessagingExecute() +{ + // Non-HTTP work submission path + + // To be reimplemented using final transport + + return 0; +} + +////////////////////////////////////////////////////////////////////////// + +int +ExecCommand::HttpExecute() +{ + ZEN_ASSERT(m_ChunkResolver); + ChunkResolver& Resolver = *m_ChunkResolver; + + std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp"); + + zen::compute::FunctionServiceSession FunctionSession(Resolver); + FunctionSession.AddRemoteRunner(Resolver, TempPath, m_HostName); + + return ExecUsingSession(FunctionSession); +} + +int +ExecCommand::BeaconExecute() +{ + ZEN_ASSERT(m_ChunkResolver); + ChunkResolver& Resolver = *m_ChunkResolver; + std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp"); + + zen::compute::FunctionServiceSession FunctionSession(Resolver); + FunctionSession.AddRemoteRunner(Resolver, TempPath, "http://localhost:8558"); + // FunctionSession.AddRemoteRunner(Resolver, TempPath, "http://10.99.9.246:8558"); + + return ExecUsingSession(FunctionSession); +} + +////////////////////////////////////////////////////////////////////////// + +void +ExecCommand::RegisterWorkerFunctionsFromDescription(const CbObject& WorkerDesc, const IoHash& WorkerId) +{ + const Guid WorkerBuildSystemVersion = WorkerDesc["buildsystem_version"sv].AsUuid(); + + for (auto& Item : WorkerDesc["functions"sv]) + { + CbObjectView Function = Item.AsObjectView(); + + std::string_view FunctionName = Function["name"sv].AsString(); + const Guid FunctionVersion = Function["version"sv].AsUuid(); + + m_FunctionList.emplace_back(FunctionDefinition{.FunctionName = std::string{FunctionName}, + .FunctionVersion = FunctionVersion, + .BuildSystemVersion = WorkerBuildSystemVersion, + .WorkerId = WorkerId}); + } +} + +void +ExecCommand::EmitFunctionListOnce(const std::vector<FunctionDefinition>& FunctionList) +{ + if (m_FunctionListEmittedOnce == false) + { + EmitFunctionList(FunctionList); + + m_FunctionListEmittedOnce = true; + } +} + +int +ExecCommand::DumpWorkItems() +{ + std::atomic<int> EmittedCount{0}; + + eastl::hash_map<IoHash, uint64_t> SeenAttachments; // Attachment CID -> count of references + + m_RecordingReader->IterateActions( + [&](CbObject ActionObject, const IoHash& ActionId) { + eastl::hash_map<IoHash, CompressedBuffer> Attachments; + + uint64_t AttachmentBytes = 0; + uint64_t UncompressedAttachmentBytes = 0; + + ActionObject.IterateAttachments([&](const zen::CbFieldView AttachmentField) { + const IoHash AttachmentCid = AttachmentField.GetValue().AsHash(); + IoBuffer AttachmentData = m_ChunkResolver->FindChunkByCid(AttachmentCid); + IoHash RawHash; + uint64_t RawSize = 0; + CompressedBuffer CompressedData = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentData), RawHash, RawSize); + Attachments[AttachmentCid] = CompressedData; + + AttachmentBytes += CompressedData.GetCompressedSize(); + UncompressedAttachmentBytes += CompressedData.DecodeRawSize(); + + if (auto [Iter, Inserted] = SeenAttachments.insert({AttachmentCid, 1}); !Inserted) + { + ++Iter->second; + } + }); + + zen::ExtendableStringBuilder<1024> ObjStr; + +# if 0 + zen::CompactBinaryToJson(ActionObject, ObjStr); + ZEN_CONSOLE("work item {} ({} attachments): {}", ActionId, Attachments.size(), ObjStr); +# else + zen::CompactBinaryToYaml(ActionObject, ObjStr); + ZEN_CONSOLE("work item {} ({} attachments, {}->{} bytes):\n{}", + ActionId, + Attachments.size(), + AttachmentBytes, + UncompressedAttachmentBytes, + ObjStr); +# endif + + ++EmittedCount; + }, + 1); + + ZEN_CONSOLE("emitted: {} actions", EmittedCount.load()); + + eastl::map<uint64_t, std::vector<IoHash>> ReferenceHistogram; + + for (const auto& [K, V] : SeenAttachments) + { + if (V > 1) + { + ReferenceHistogram[V].push_back(K); + } + } + + for (const auto& [RefCount, Cids] : ReferenceHistogram) + { + ZEN_CONSOLE("{} attachments with {} references", Cids.size(), RefCount); + } + + return 0; +} + +////////////////////////////////////////////////////////////////////////// + +int +ExecCommand::BuildActionsLog() +{ + ZEN_ASSERT(m_ChunkResolver); + ChunkResolver& Resolver = *m_ChunkResolver; + + if (m_RecordingPath.empty()) + { + throw OptionParseException("need to specify recording path", m_Options.help()); + } + + if (std::filesystem::exists(m_RecordingLogPath)) + { + throw OptionParseException(fmt::format("recording log directory '{}' already exists!", m_RecordingLogPath), m_Options.help()); + } + + ZEN_NOT_IMPLEMENTED("build log generation not implemented yet!"); + + std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp"); + + zen::compute::FunctionServiceSession FunctionSession(Resolver); + FunctionSession.StartRecording(Resolver, m_RecordingLogPath); + + return ExecUsingSession(FunctionSession); +} + +void +ExecCommand::EmitFunctionList(const std::vector<FunctionDefinition>& FunctionList) +{ + ZEN_CONSOLE("=== Known functions:\n==========================="); + + ZEN_CONSOLE("{:30} {:36} {:36} {}", "function", "version", "build system", "worker id"); + + for (const FunctionDefinition& Func : FunctionList) + { + ZEN_CONSOLE("{:30} {:36} {:36} {}", Func.FunctionName, Func.FunctionVersion, Func.BuildSystemVersion, Func.WorkerId); + } + + ZEN_CONSOLE("==========================="); +} + +} // namespace zen + +#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zen/cmds/exec_cmd.h b/src/zen/cmds/exec_cmd.h new file mode 100644 index 000000000..43d092144 --- /dev/null +++ b/src/zen/cmds/exec_cmd.h @@ -0,0 +1,97 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "../zen.h" + +#include <zencompute/recordingreader.h> +#include <zencore/compactbinarypackage.h> +#include <zencore/guid.h> +#include <zencore/iohash.h> + +#include <filesystem> +#include <functional> +#include <unordered_map> + +namespace zen { +class CbPackage; +class CbObject; +struct IoHash; +class ChunkResolver; +} // namespace zen + +#if ZEN_WITH_COMPUTE_SERVICES + +namespace zen::compute { +class FunctionServiceSession; +} + +namespace zen { + +/** + * Zen CLI command for executing functions from a recording + * + * Mostly for testing and debugging purposes + */ + +class ExecCommand : public ZenCmdBase +{ +public: + ExecCommand(); + ~ExecCommand(); + + static constexpr char Name[] = "exec"; + static constexpr char Description[] = "Execute functions from a recording"; + + virtual void Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override; + virtual cxxopts::Options& Options() override { return m_Options; } + +private: + cxxopts::Options m_Options{Name, Description}; + std::string m_HostName; + std::filesystem::path m_BeaconPath; + std::filesystem::path m_RecordingPath; + std::filesystem::path m_RecordingLogPath; + int m_Offset = 0; + int m_Stride = 1; + int m_Limit = 0; + bool m_Quiet = false; + std::string m_Mode{"http"}; + + struct FunctionDefinition + { + std::string FunctionName; + zen::Guid FunctionVersion; + zen::Guid BuildSystemVersion; + zen::IoHash WorkerId; + }; + + bool m_FunctionListEmittedOnce = false; + void EmitFunctionListOnce(const std::vector<FunctionDefinition>& FunctionList); + void EmitFunctionList(const std::vector<FunctionDefinition>& FunctionList); + + std::unordered_map<zen::IoHash, zen::CbPackage> m_WorkerMap; + std::vector<FunctionDefinition> m_FunctionList; + bool m_VerboseLogging = false; + bool m_QuietLogging = false; + + zen::ChunkResolver* m_ChunkResolver = nullptr; + zen::compute::RecordingReaderBase* m_RecordingReader = nullptr; + + void RegisterWorkerFunctionsFromDescription(const zen::CbObject& WorkerDesc, const zen::IoHash& WorkerId); + + int ExecUsingSession(zen::compute::FunctionServiceSession& FunctionSession); + + // Execution modes + + int DumpWorkItems(); + int HttpExecute(); + int InProcessExecute(); + int LocalMessagingExecute(); + int BeaconExecute(); + int BuildActionsLog(); +}; + +} // namespace zen + +#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zen/xmake.lua b/src/zen/xmake.lua index ab094fef3..f889c3296 100644 --- a/src/zen/xmake.lua +++ b/src/zen/xmake.lua @@ -6,15 +6,12 @@ target("zen") add_files("**.cpp") add_files("zen.cpp", {unity_ignored = true }) add_deps("zencore", "zenhttp", "zenremotestore", "zenstore", "zenutil") + add_deps("zencompute", "zennet") add_deps("cxxopts", "fmt") add_packages("json11") add_includedirs(".") set_symbols("debug") - if is_mode("release") then - set_optimize("fastest") - end - if is_plat("windows") then add_files("zen.rc") add_ldflags("/subsystem:console,5.02") diff --git a/src/zen/zen.cpp b/src/zen/zen.cpp index 25245c3d2..018f77738 100644 --- a/src/zen/zen.cpp +++ b/src/zen/zen.cpp @@ -11,6 +11,7 @@ #include "cmds/cache_cmd.h" #include "cmds/copy_cmd.h" #include "cmds/dedup_cmd.h" +#include "cmds/exec_cmd.h" #include "cmds/info_cmd.h" #include "cmds/print_cmd.h" #include "cmds/projectstore_cmd.h" @@ -316,22 +317,25 @@ main(int argc, char** argv) } #endif // ZEN_WITH_TRACE - AttachCommand AttachCmd; - BenchCommand BenchCmd; - BuildsCommand BuildsCmd; - CacheDetailsCommand CacheDetailsCmd; - CacheGetCommand CacheGetCmd; - CacheGenerateCommand CacheGenerateCmd; - CacheInfoCommand CacheInfoCmd; - CacheStatsCommand CacheStatsCmd; - CopyCommand CopyCmd; - CopyStateCommand CopyStateCmd; - CreateOplogCommand CreateOplogCmd; - CreateProjectCommand CreateProjectCmd; - DedupCommand DedupCmd; - DownCommand DownCmd; - DropCommand DropCmd; - DropProjectCommand ProjectDropCmd; + AttachCommand AttachCmd; + BenchCommand BenchCmd; + BuildsCommand BuildsCmd; + CacheDetailsCommand CacheDetailsCmd; + CacheGetCommand CacheGetCmd; + CacheGenerateCommand CacheGenerateCmd; + CacheInfoCommand CacheInfoCmd; + CacheStatsCommand CacheStatsCmd; + CopyCommand CopyCmd; + CopyStateCommand CopyStateCmd; + CreateOplogCommand CreateOplogCmd; + CreateProjectCommand CreateProjectCmd; + DedupCommand DedupCmd; + DownCommand DownCmd; + DropCommand DropCmd; + DropProjectCommand ProjectDropCmd; +#if ZEN_WITH_COMPUTE_SERVICES + ExecCommand ExecCmd; +#endif // ZEN_WITH_COMPUTE_SERVICES ExportOplogCommand ExportOplogCmd; FlushCommand FlushCmd; GcCommand GcCmd; @@ -388,6 +392,9 @@ main(int argc, char** argv) {"dedup", &DedupCmd, "Dedup files"}, {"down", &DownCmd, "Bring zen server down"}, {"drop", &DropCmd, "Drop cache namespace or bucket"}, +#if ZEN_WITH_COMPUTE_SERVICES + {ExecCommand::Name, &ExecCmd, ExecCommand::Description}, +#endif {"gc-status", &GcStatusCmd, "Garbage collect zen storage status check"}, {"gc-stop", &GcStopCmd, "Request cancel of running garbage collection in zen storage"}, {"gc", &GcCmd, "Garbage collect zen storage"}, diff --git a/src/zencompute-test/xmake.lua b/src/zencompute-test/xmake.lua new file mode 100644 index 000000000..64a3c7703 --- /dev/null +++ b/src/zencompute-test/xmake.lua @@ -0,0 +1,9 @@ +-- Copyright Epic Games, Inc. All Rights Reserved. + +target("zencompute-test") + set_kind("binary") + set_group("tests") + add_headerfiles("**.h") + add_files("*.cpp") + add_deps("zencompute", "zencore") + add_packages("vcpkg::doctest") diff --git a/src/zencompute-test/zencompute-test.cpp b/src/zencompute-test/zencompute-test.cpp new file mode 100644 index 000000000..237812e12 --- /dev/null +++ b/src/zencompute-test/zencompute-test.cpp @@ -0,0 +1,32 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zencompute/zencompute.h> +#include <zencore/filesystem.h> +#include <zencore/logging.h> +#include <zencore/zencore.h> + +#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC +# include <sys/time.h> +# include <sys/resource.h> +# include <zencore/except.h> +#endif + +#if ZEN_WITH_TESTS +# define ZEN_TEST_WITH_RUNNER 1 +# include <zencore/testing.h> +#endif + +int +main([[maybe_unused]] int argc, [[maybe_unused]] char* argv[]) +{ +#if ZEN_WITH_TESTS + zen::zencompute_forcelinktests(); + + zen::logging::InitializeLogging(); + zen::MaximizeOpenFileCount(); + + return ZEN_RUN_TESTS(argc, argv); +#else + return 0; +#endif +} diff --git a/src/zencompute/actionrecorder.cpp b/src/zencompute/actionrecorder.cpp new file mode 100644 index 000000000..04c4b5141 --- /dev/null +++ b/src/zencompute/actionrecorder.cpp @@ -0,0 +1,258 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "actionrecorder.h" + +#include "functionrunner.h" + +#include <zencore/compactbinary.h> +#include <zencore/compactbinaryfile.h> +#include <zencore/compactbinaryvalue.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> + +#if ZEN_PLATFORM_WINDOWS +# include <ppl.h> +# define ZEN_CONCRT_AVAILABLE 1 +#else +# define ZEN_CONCRT_AVAILABLE 0 +#endif + +#if ZEN_WITH_COMPUTE_SERVICES + +namespace zen::compute { + +using namespace std::literals; + +////////////////////////////////////////////////////////////////////////// + +RecordingFileWriter::RecordingFileWriter() +{ +} + +RecordingFileWriter::~RecordingFileWriter() +{ + Close(); +} + +void +RecordingFileWriter::Open(std::filesystem::path FilePath) +{ + using namespace std::literals; + + m_File.Open(FilePath, BasicFile::Mode::kTruncate); + m_File.Write("----DDC2----DATA", 16, 0); + m_FileOffset = 16; + + std::filesystem::path TocPath = FilePath.replace_extension(".ztoc"); + m_TocFile.Open(TocPath, BasicFile::Mode::kTruncate); + + m_TocWriter << "version"sv << 1; + m_TocWriter.BeginArray("toc"sv); +} + +void +RecordingFileWriter::Close() +{ + m_TocWriter.EndArray(); + CbObject Toc = m_TocWriter.Save(); + + std::error_code Ec; + m_TocFile.WriteAll(Toc.GetBuffer().AsIoBuffer(), Ec); +} + +void +RecordingFileWriter::AppendObject(const CbObject& Object, const IoHash& ObjectHash) +{ + RwLock::ExclusiveLockScope _(m_FileLock); + + MemoryView ObjectView = Object.GetBuffer().GetView(); + + std::error_code Ec; + m_File.Write(ObjectView, m_FileOffset, Ec); + + if (Ec) + { + throw std::system_error(Ec, "failed writing to archive"); + } + + m_TocWriter.BeginArray(); + m_TocWriter.AddHash(ObjectHash); + m_TocWriter.AddInteger(m_FileOffset); + m_TocWriter.AddInteger(gsl::narrow<int>(ObjectView.GetSize())); + m_TocWriter.EndArray(); + + m_FileOffset += ObjectView.GetSize(); +} + +////////////////////////////////////////////////////////////////////////// + +ActionRecorder::ActionRecorder(ChunkResolver& InChunkResolver, const std::filesystem::path& RecordingLogPath) +: m_ChunkResolver(InChunkResolver) +, m_RecordingLogDir(RecordingLogPath) +{ + std::error_code Ec; + CreateDirectories(m_RecordingLogDir, Ec); + + if (Ec) + { + ZEN_WARN("Could not create directory '{}': {}", m_RecordingLogDir, Ec.message()); + } + + CleanDirectory(m_RecordingLogDir, /* ForceRemoveReadOnlyFiles */ true, Ec); + + if (Ec) + { + ZEN_WARN("Could not clean directory '{}': {}", m_RecordingLogDir, Ec.message()); + } + + m_WorkersFile.Open(m_RecordingLogDir / "workers.zdat"); + m_ActionsFile.Open(m_RecordingLogDir / "actions.zdat"); + + CidStoreConfiguration CidConfig; + CidConfig.RootDirectory = m_RecordingLogDir / "cid"; + CidConfig.HugeValueThreshold = 128 * 1024 * 1024; + + m_CidStore.Initialize(CidConfig); +} + +ActionRecorder::~ActionRecorder() +{ + Shutdown(); +} + +void +ActionRecorder::Shutdown() +{ + m_CidStore.Flush(); +} + +void +ActionRecorder::RegisterWorker(const CbPackage& WorkerPackage) +{ + const IoHash WorkerId = WorkerPackage.GetObjectHash(); + + m_WorkersFile.AppendObject(WorkerPackage.GetObject(), WorkerId); + + std::unordered_set<IoHash> AddedChunks; + uint64_t AddedBytes = 0; + + // First add all attachments from the worker package itself + + for (const CbAttachment& Attachment : WorkerPackage.GetAttachments()) + { + CompressedBuffer Buffer = Attachment.AsCompressedBinary(); + IoBuffer Data = Buffer.GetCompressed().Flatten().AsIoBuffer(); + + const IoHash ChunkHash = Buffer.DecodeRawHash(); + + CidStore::InsertResult Result = m_CidStore.AddChunk(Data, ChunkHash, CidStore::InsertMode::kCopyOnly); + + AddedChunks.insert(ChunkHash); + + if (Result.New) + { + AddedBytes += Data.GetSize(); + } + } + + // Not all attachments will be present in the worker package, so we need to add + // all referenced chunks to ensure that the recording is self-contained and not + // referencing data in the main CID store + + CbObject WorkerDescriptor = WorkerPackage.GetObject(); + + WorkerDescriptor.IterateAttachments([&](const CbFieldView AttachmentField) { + const IoHash AttachmentCid = AttachmentField.GetValue().AsHash(); + + if (!AddedChunks.contains(AttachmentCid)) + { + IoBuffer AttachmentData = m_ChunkResolver.FindChunkByCid(AttachmentCid); + + if (AttachmentData) + { + CidStore::InsertResult Result = m_CidStore.AddChunk(AttachmentData, AttachmentCid, CidStore::InsertMode::kCopyOnly); + + if (Result.New) + { + AddedBytes += AttachmentData.GetSize(); + } + } + else + { + ZEN_WARN("RegisterWorker: could not resolve attachment chunk {} for worker {}", AttachmentCid, WorkerId); + } + + AddedChunks.insert(AttachmentCid); + } + }); + + ZEN_INFO("recorded worker {} with {} attachments ({} bytes)", WorkerId, AddedChunks.size(), AddedBytes); +} + +bool +ActionRecorder::RecordAction(Ref<RunnerAction> Action) +{ + bool AllGood = true; + + Action->ActionObj.IterateAttachments([&](CbFieldView Field) { + IoHash AttachData = Field.AsHash(); + IoBuffer ChunkData = m_ChunkResolver.FindChunkByCid(AttachData); + + if (ChunkData) + { + if (ChunkData.GetContentType() == ZenContentType::kCompressedBinary) + { + IoHash DecompressedHash; + uint64_t RawSize = 0; + CompressedBuffer Compressed = + CompressedBuffer::FromCompressed(SharedBuffer(ChunkData), /* out */ DecompressedHash, /* out*/ RawSize); + + OodleCompressor Compressor; + OodleCompressionLevel CompressionLevel; + uint64_t BlockSize = 0; + if (Compressed.TryGetCompressParameters(/* out */ Compressor, /* out */ CompressionLevel, /* out */ BlockSize)) + { + if (Compressor == OodleCompressor::NotSet) + { + CompositeBuffer Decompressed = Compressed.DecompressToComposite(); + CompressedBuffer NewCompressed = CompressedBuffer::Compress(std::move(Decompressed), + OodleCompressor::Mermaid, + OodleCompressionLevel::Fast, + BlockSize); + + ChunkData = NewCompressed.GetCompressed().Flatten().AsIoBuffer(); + } + } + } + + const uint64_t ChunkSize = ChunkData.GetSize(); + + m_CidStore.AddChunk(ChunkData, AttachData, CidStore::InsertMode::kCopyOnly); + ++m_ChunkCounter; + m_ChunkBytesCounter.fetch_add(ChunkSize); + } + else + { + AllGood = false; + + ZEN_WARN("could not resolve chunk {}", AttachData); + } + }); + + if (AllGood) + { + m_ActionsFile.AppendObject(Action->ActionObj, Action->ActionId); + ++m_ActionsCounter; + + return true; + } + else + { + return false; + } +} + +} // namespace zen::compute + +#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zencompute/actionrecorder.h b/src/zencompute/actionrecorder.h new file mode 100644 index 000000000..9cc2b44a2 --- /dev/null +++ b/src/zencompute/actionrecorder.h @@ -0,0 +1,91 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencompute/functionservice.h> +#include <zencompute/zencompute.h> +#include <zencore/basicfile.h> +#include <zencore/compactbinarybuilder.h> +#include <zenstore/cidstore.h> +#include <zenstore/gc.h> +#include <zenstore/zenstore.h> + +#include <filesystem> +#include <functional> +#include <map> +#include <unordered_map> + +namespace zen { +class CbObject; +class CbPackage; +struct IoHash; +} // namespace zen + +#if ZEN_WITH_COMPUTE_SERVICES + +namespace zen::compute { + +////////////////////////////////////////////////////////////////////////// + +struct RecordingFileWriter +{ + RecordingFileWriter(RecordingFileWriter&&) = delete; + RecordingFileWriter& operator=(RecordingFileWriter&&) = delete; + + RwLock m_FileLock; + BasicFile m_File; + uint64_t m_FileOffset = 0; + CbObjectWriter m_TocWriter; + BasicFile m_TocFile; + + RecordingFileWriter(); + ~RecordingFileWriter(); + + void Open(std::filesystem::path FilePath); + void Close(); + void AppendObject(const CbObject& Object, const IoHash& ObjectHash); +}; + +////////////////////////////////////////////////////////////////////////// + +/** + * Recording "runner" implementation + * + * This class writes out all actions and their attachments to a recording directory + * in a format that can be read back by the RecordingReader. + * + * The contents of the recording directory will be self-contained, with all referenced + * attachments stored in the recording directory itself, so that the recording can be + * moved or shared without needing to maintain references to the main CID store. + * + */ + +class ActionRecorder +{ +public: + ActionRecorder(ChunkResolver& InChunkResolver, const std::filesystem::path& RecordingLogPath); + ~ActionRecorder(); + + ActionRecorder(const ActionRecorder&) = delete; + ActionRecorder& operator=(const ActionRecorder&) = delete; + + void Shutdown(); + void RegisterWorker(const CbPackage& WorkerPackage); + bool RecordAction(Ref<RunnerAction> Action); + +private: + ChunkResolver& m_ChunkResolver; + std::filesystem::path m_RecordingLogDir; + + RecordingFileWriter m_WorkersFile; + RecordingFileWriter m_ActionsFile; + GcManager m_Gc; + CidStore m_CidStore{m_Gc}; + std::atomic<int> m_ChunkCounter{0}; + std::atomic<uint64_t> m_ChunkBytesCounter{0}; + std::atomic<int> m_ActionsCounter{0}; +}; + +} // namespace zen::compute + +#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zencompute/functionrunner.cpp b/src/zencompute/functionrunner.cpp new file mode 100644 index 000000000..8e7c12b2b --- /dev/null +++ b/src/zencompute/functionrunner.cpp @@ -0,0 +1,112 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "functionrunner.h" + +#if ZEN_WITH_COMPUTE_SERVICES + +# include <zencore/compactbinary.h> +# include <zencore/filesystem.h> + +# include <fmt/format.h> +# include <vector> + +namespace zen::compute { + +FunctionRunner::FunctionRunner(std::filesystem::path BasePath) : m_ActionsPath(BasePath / "actions") +{ +} + +FunctionRunner::~FunctionRunner() = default; + +size_t +FunctionRunner::QueryCapacity() +{ + return 1; +} + +std::vector<SubmitResult> +FunctionRunner::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) +{ + std::vector<SubmitResult> Results; + Results.reserve(Actions.size()); + + for (const Ref<RunnerAction>& Action : Actions) + { + Results.push_back(SubmitAction(Action)); + } + + return Results; +} + +void +FunctionRunner::MaybeDumpAction(int ActionLsn, const CbObject& ActionObject) +{ + if (m_DumpActions) + { + std::string UniqueId = fmt::format("{}.ddb", ActionLsn); + std::filesystem::path Path = m_ActionsPath / UniqueId; + + zen::WriteFile(Path, IoBuffer(ActionObject.GetBuffer().AsIoBuffer())); + } +} + +////////////////////////////////////////////////////////////////////////// + +RunnerAction::RunnerAction(FunctionServiceSession* OwnerSession) : m_OwnerSession(OwnerSession) +{ + this->Timestamps[static_cast<int>(State::New)] = DateTime::Now().GetTicks(); +} + +RunnerAction::~RunnerAction() +{ +} + +void +RunnerAction::SetActionState(State NewState) +{ + ZEN_ASSERT(NewState < State::_Count); + this->Timestamps[static_cast<int>(NewState)] = DateTime::Now().GetTicks(); + + do + { + if (State CurrentState = m_ActionState.load(); CurrentState == NewState) + { + // No state change + return; + } + else + { + if (NewState <= CurrentState) + { + // Cannot transition to an earlier or same state + return; + } + + if (m_ActionState.compare_exchange_strong(CurrentState, NewState)) + { + // Successful state change + + m_OwnerSession->PostUpdate(this); + + return; + } + } + } while (true); +} + +void +RunnerAction::SetResult(CbPackage&& Result) +{ + m_Result = std::move(Result); +} + +CbPackage& +RunnerAction::GetResult() +{ + ZEN_ASSERT(IsCompleted()); + return m_Result; +} + +} // namespace zen::compute + +#endif // ZEN_WITH_COMPUTE_SERVICES
\ No newline at end of file diff --git a/src/zencompute/functionrunner.h b/src/zencompute/functionrunner.h new file mode 100644 index 000000000..6fd0d84cc --- /dev/null +++ b/src/zencompute/functionrunner.h @@ -0,0 +1,207 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencompute/functionservice.h> + +#if ZEN_WITH_COMPUTE_SERVICES + +# include <filesystem> +# include <vector> + +namespace zen::compute { + +struct SubmitResult +{ + bool IsAccepted = false; + std::string Reason; +}; + +/** Base interface for classes implementing a remote execution "runner" + */ +class FunctionRunner : public RefCounted +{ + FunctionRunner(FunctionRunner&&) = delete; + FunctionRunner& operator=(FunctionRunner&&) = delete; + +public: + FunctionRunner(std::filesystem::path BasePath); + virtual ~FunctionRunner() = 0; + + virtual void Shutdown() = 0; + virtual void RegisterWorker(const CbPackage& WorkerPackage) = 0; + + [[nodiscard]] virtual SubmitResult SubmitAction(Ref<RunnerAction> Action) = 0; + [[nodiscard]] virtual size_t GetSubmittedActionCount() = 0; + [[nodiscard]] virtual bool IsHealthy() = 0; + [[nodiscard]] virtual size_t QueryCapacity(); + [[nodiscard]] virtual std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions); + +protected: + std::filesystem::path m_ActionsPath; + bool m_DumpActions = false; + void MaybeDumpAction(int ActionLsn, const CbObject& ActionObject); +}; + +template<typename RunnerType> +struct RunnerGroup +{ + void AddRunner(RunnerType* Runner) + { + m_RunnersLock.WithExclusiveLock([&] { m_Runners.emplace_back(Runner); }); + } + size_t QueryCapacity() + { + size_t TotalCapacity = 0; + m_RunnersLock.WithSharedLock([&] { + for (const auto& Runner : m_Runners) + { + TotalCapacity += Runner->QueryCapacity(); + } + }); + return TotalCapacity; + } + + SubmitResult SubmitAction(Ref<RunnerAction> Action) + { + RwLock::SharedLockScope _(m_RunnersLock); + + const int InitialIndex = m_NextSubmitIndex.load(std::memory_order_acquire); + int Index = InitialIndex; + const int RunnerCount = gsl::narrow<int>(m_Runners.size()); + + if (RunnerCount == 0) + { + return {.IsAccepted = false, .Reason = "No runners available"}; + } + + do + { + while (Index >= RunnerCount) + { + Index -= RunnerCount; + } + + auto& Runner = m_Runners[Index++]; + + SubmitResult Result = Runner->SubmitAction(Action); + + if (Result.IsAccepted == true) + { + m_NextSubmitIndex = Index % RunnerCount; + + return Result; + } + + while (Index >= RunnerCount) + { + Index -= RunnerCount; + } + } while (Index != InitialIndex); + + return {.IsAccepted = false}; + } + + size_t GetSubmittedActionCount() + { + RwLock::SharedLockScope _(m_RunnersLock); + + size_t TotalCount = 0; + + for (const auto& Runner : m_Runners) + { + TotalCount += Runner->GetSubmittedActionCount(); + } + + return TotalCount; + } + + void RegisterWorker(CbPackage Worker) + { + RwLock::SharedLockScope _(m_RunnersLock); + + for (auto& Runner : m_Runners) + { + Runner->RegisterWorker(Worker); + } + } + + void Shutdown() + { + RwLock::SharedLockScope _(m_RunnersLock); + + for (auto& Runner : m_Runners) + { + Runner->Shutdown(); + } + } + +private: + RwLock m_RunnersLock; + std::vector<Ref<RunnerType>> m_Runners; + std::atomic<int> m_NextSubmitIndex{0}; +}; + +/** + * This represents an action going through different stages of scheduling and execution. + */ +struct RunnerAction : public RefCounted +{ + explicit RunnerAction(FunctionServiceSession* OwnerSession); + ~RunnerAction(); + + int ActionLsn = 0; + WorkerDesc Worker; + IoHash ActionId; + CbObject ActionObj; + int Priority = 0; + + enum class State + { + New, + Pending, + Running, + Completed, + Failed, + _Count + }; + + static const char* ToString(State _) + { + switch (_) + { + case State::New: + return "New"; + case State::Pending: + return "Pending"; + case State::Running: + return "Running"; + case State::Completed: + return "Completed"; + case State::Failed: + return "Failed"; + default: + return "Unknown"; + } + } + + uint64_t Timestamps[static_cast<int>(State::_Count)] = {}; + + State ActionState() const { return m_ActionState; } + void SetActionState(State NewState); + + bool IsSuccess() const { return ActionState() == State::Completed; } + bool IsCompleted() const { return ActionState() == State::Completed || ActionState() == State::Failed; } + + void SetResult(CbPackage&& Result); + CbPackage& GetResult(); + +private: + std::atomic<State> m_ActionState = State::New; + FunctionServiceSession* m_OwnerSession = nullptr; + CbPackage m_Result; +}; + +} // namespace zen::compute + +#endif // ZEN_WITH_COMPUTE_SERVICES
\ No newline at end of file diff --git a/src/zencompute/functionservice.cpp b/src/zencompute/functionservice.cpp new file mode 100644 index 000000000..0698449e9 --- /dev/null +++ b/src/zencompute/functionservice.cpp @@ -0,0 +1,957 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "zencompute/functionservice.h" + +#if ZEN_WITH_COMPUTE_SERVICES + +# include "functionrunner.h" +# include "actionrecorder.h" +# include "localrunner.h" +# include "remotehttprunner.h" + +# include <zencompute/recordingreader.h> +# include <zencore/compactbinary.h> +# include <zencore/compactbinarybuilder.h> +# include <zencore/compactbinarypackage.h> +# include <zencore/compress.h> +# include <zencore/except.h> +# include <zencore/filesystem.h> +# include <zencore/fmtutils.h> +# include <zencore/iobuffer.h> +# include <zencore/iohash.h> +# include <zencore/logging.h> +# include <zencore/scopeguard.h> +# include <zentelemetry/stats.h> + +# include <set> +# include <deque> +# include <map> +# include <thread> +# include <unordered_map> + +ZEN_THIRD_PARTY_INCLUDES_START +# include <EASTL/hash_set.h> +ZEN_THIRD_PARTY_INCLUDES_END + +using namespace std::literals; + +namespace zen::compute { + +////////////////////////////////////////////////////////////////////////// + +struct FunctionServiceSession::Impl +{ + FunctionServiceSession* m_FunctionServiceSession; + ChunkResolver& m_ChunkResolver; + LoggerRef m_Log{logging::Get("apply")}; + + Impl(FunctionServiceSession* InFunctionServiceSession, ChunkResolver& InChunkResolver) + : m_FunctionServiceSession(InFunctionServiceSession) + , m_ChunkResolver(InChunkResolver) + { + m_SchedulingThread = std::thread{&Impl::MonitorThreadFunction, this}; + } + + void Shutdown(); + bool IsHealthy(); + + LoggerRef Log() { return m_Log; } + + std::atomic_bool m_AcceptActions = true; + + struct FunctionDefinition + { + std::string FunctionName; + Guid FunctionVersion; + Guid BuildSystemVersion; + IoHash WorkerId; + }; + + void EmitStats(CbObjectWriter& Cbo) + { + m_WorkerLock.WithSharedLock([&] { Cbo << "worker_count"sv << m_WorkerMap.size(); }); + m_ResultsLock.WithSharedLock([&] { Cbo << "actions_complete"sv << m_ResultsMap.size(); }); + m_PendingLock.WithSharedLock([&] { Cbo << "actions_pending"sv << m_PendingActions.size(); }); + Cbo << "actions_submitted"sv << GetSubmittedActionCount(); + EmitSnapshot("actions_retired"sv, m_ResultRate, Cbo); + } + + void RegisterWorker(CbPackage Worker); + WorkerDesc GetWorkerDescriptor(const IoHash& WorkerId); + + std::atomic<int32_t> m_ActionsCounter = 0; // sequence number + + RwLock m_PendingLock; + std::map<int, Ref<RunnerAction>> m_PendingActions; + + RwLock m_RunningLock; + std::unordered_map<int, Ref<RunnerAction>> m_RunningMap; + + RwLock m_ResultsLock; + std::unordered_map<int, Ref<RunnerAction>> m_ResultsMap; + metrics::Meter m_ResultRate; + std::atomic<uint64_t> m_RetiredCount{0}; + + HttpResponseCode GetActionResult(int ActionLsn, CbPackage& OutResultPackage); + HttpResponseCode FindActionResult(const IoHash& ActionId, CbPackage& ResultPackage); + + std::atomic<bool> m_ShutdownRequested{false}; + + std::thread m_SchedulingThread; + std::atomic<bool> m_SchedulingThreadEnabled{true}; + Event m_SchedulingThreadEvent; + + void MonitorThreadFunction(); + void SchedulePendingActions(); + + // Workers + + RwLock m_WorkerLock; + std::unordered_map<IoHash, CbPackage> m_WorkerMap; + std::vector<FunctionDefinition> m_FunctionList; + std::vector<IoHash> GetKnownWorkerIds(); + + // Runners + + RunnerGroup<LocalProcessRunner> m_LocalRunnerGroup; + RunnerGroup<RemoteHttpRunner> m_RemoteRunnerGroup; + + EnqueueResult EnqueueAction(CbObject ActionObject, int Priority); + EnqueueResult EnqueueResolvedAction(WorkerDesc Worker, CbObject ActionObj, int RequestPriority); + + void GetCompleted(CbWriter& Cbo); + + // Recording + + void StartRecording(ChunkResolver& InCidStore, const std::filesystem::path& RecordingPath); + void StopRecording(); + + std::unique_ptr<ActionRecorder> m_Recorder; + + // History tracking + + RwLock m_ActionHistoryLock; + std::deque<FunctionServiceSession::ActionHistoryEntry> m_ActionHistory; + size_t m_HistoryLimit = 1000; + + std::vector<FunctionServiceSession::ActionHistoryEntry> GetActionHistory(int Limit); + + // + + [[nodiscard]] size_t QueryCapacity(); + + [[nodiscard]] SubmitResult SubmitAction(Ref<RunnerAction> Action); + [[nodiscard]] std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions); + [[nodiscard]] size_t GetSubmittedActionCount(); + + // Updates + + RwLock m_UpdatedActionsLock; + std::vector<Ref<RunnerAction>> m_UpdatedActions; + + void HandleActionUpdates(); + void PostUpdate(RunnerAction* Action); + + void ShutdownRunners(); +}; + +bool +FunctionServiceSession::Impl::IsHealthy() +{ + return true; +} + +void +FunctionServiceSession::Impl::Shutdown() +{ + m_AcceptActions = false; + m_ShutdownRequested = true; + + m_SchedulingThreadEnabled = false; + m_SchedulingThreadEvent.Set(); + if (m_SchedulingThread.joinable()) + { + m_SchedulingThread.join(); + } + + ShutdownRunners(); +} + +void +FunctionServiceSession::Impl::ShutdownRunners() +{ + m_LocalRunnerGroup.Shutdown(); + m_RemoteRunnerGroup.Shutdown(); +} + +void +FunctionServiceSession::Impl::StartRecording(ChunkResolver& InCidStore, const std::filesystem::path& RecordingPath) +{ + ZEN_INFO("starting recording to '{}'", RecordingPath); + + m_Recorder = std::make_unique<ActionRecorder>(InCidStore, RecordingPath); + + ZEN_INFO("started recording to '{}'", RecordingPath); +} + +void +FunctionServiceSession::Impl::StopRecording() +{ + ZEN_INFO("stopping recording"); + + m_Recorder = nullptr; + + ZEN_INFO("stopped recording"); +} + +std::vector<FunctionServiceSession::ActionHistoryEntry> +FunctionServiceSession::Impl::GetActionHistory(int Limit) +{ + RwLock::SharedLockScope _(m_ActionHistoryLock); + + if (Limit > 0 && static_cast<size_t>(Limit) < m_ActionHistory.size()) + { + return std::vector<ActionHistoryEntry>(m_ActionHistory.end() - Limit, m_ActionHistory.end()); + } + + return std::vector<ActionHistoryEntry>(m_ActionHistory.begin(), m_ActionHistory.end()); +} + +void +FunctionServiceSession::Impl::RegisterWorker(CbPackage Worker) +{ + RwLock::ExclusiveLockScope _(m_WorkerLock); + + const IoHash& WorkerId = Worker.GetObject().GetHash(); + + if (m_WorkerMap.insert_or_assign(WorkerId, Worker).second) + { + // Note that since the convention currently is that WorkerId is equal to the hash + // of the worker descriptor there is no chance that we get a second write with a + // different descriptor. Thus we only need to call this the first time, when the + // worker is added + + m_LocalRunnerGroup.RegisterWorker(Worker); + m_RemoteRunnerGroup.RegisterWorker(Worker); + + if (m_Recorder) + { + m_Recorder->RegisterWorker(Worker); + } + + CbObject WorkerObj = Worker.GetObject(); + + // Populate worker database + + const Guid WorkerBuildSystemVersion = WorkerObj["buildsystem_version"sv].AsUuid(); + + for (auto& Item : WorkerObj["functions"sv]) + { + CbObjectView Function = Item.AsObjectView(); + + std::string_view FunctionName = Function["name"sv].AsString(); + const Guid FunctionVersion = Function["version"sv].AsUuid(); + + m_FunctionList.emplace_back(FunctionDefinition{.FunctionName = std::string{FunctionName}, + .FunctionVersion = FunctionVersion, + .BuildSystemVersion = WorkerBuildSystemVersion, + .WorkerId = WorkerId}); + } + } +} + +WorkerDesc +FunctionServiceSession::Impl::GetWorkerDescriptor(const IoHash& WorkerId) +{ + RwLock::SharedLockScope _(m_WorkerLock); + + if (auto It = m_WorkerMap.find(WorkerId); It != m_WorkerMap.end()) + { + const CbPackage& Desc = It->second; + return {Desc, WorkerId}; + } + + return {}; +} + +std::vector<IoHash> +FunctionServiceSession::Impl::GetKnownWorkerIds() +{ + std::vector<IoHash> WorkerIds; + WorkerIds.reserve(m_WorkerMap.size()); + + m_WorkerLock.WithSharedLock([&] { + for (const auto& [WorkerId, _] : m_WorkerMap) + { + WorkerIds.push_back(WorkerId); + } + }); + + return WorkerIds; +} + +FunctionServiceSession::EnqueueResult +FunctionServiceSession::Impl::EnqueueAction(CbObject ActionObject, int Priority) +{ + // Resolve function to worker + + IoHash WorkerId{IoHash::Zero}; + + std::string_view FunctionName = ActionObject["Function"sv].AsString(); + const Guid FunctionVersion = ActionObject["FunctionVersion"sv].AsUuid(); + const Guid BuildSystemVersion = ActionObject["BuildSystemVersion"sv].AsUuid(); + + for (const FunctionDefinition& FuncDef : m_FunctionList) + { + if (FuncDef.FunctionName == FunctionName && FuncDef.FunctionVersion == FunctionVersion && + FuncDef.BuildSystemVersion == BuildSystemVersion) + { + WorkerId = FuncDef.WorkerId; + + break; + } + } + + if (WorkerId == IoHash::Zero) + { + CbObjectWriter Writer; + + Writer << "Function"sv << FunctionName << "FunctionVersion"sv << FunctionVersion << "BuildSystemVersion" << BuildSystemVersion; + Writer << "error" + << "no worker matches the action specification"; + + return {0, Writer.Save()}; + } + + if (auto It = m_WorkerMap.find(WorkerId); It != m_WorkerMap.end()) + { + CbPackage WorkerPackage = It->second; + + return EnqueueResolvedAction(WorkerDesc{WorkerPackage, WorkerId}, ActionObject, Priority); + } + + CbObjectWriter Writer; + + Writer << "Function"sv << FunctionName << "FunctionVersion"sv << FunctionVersion << "BuildSystemVersion" << BuildSystemVersion; + Writer << "error" + << "no worker found despite match"; + + return {0, Writer.Save()}; +} + +FunctionServiceSession::EnqueueResult +FunctionServiceSession::Impl::EnqueueResolvedAction(WorkerDesc Worker, CbObject ActionObj, int RequestPriority) +{ + const int ActionLsn = ++m_ActionsCounter; + + Ref<RunnerAction> Pending{new RunnerAction(m_FunctionServiceSession)}; + + Pending->ActionLsn = ActionLsn; + Pending->Worker = Worker; + Pending->ActionId = ActionObj.GetHash(); + Pending->ActionObj = ActionObj; + Pending->Priority = RequestPriority; + + SubmitResult SubResult = SubmitAction(Pending); + + if (SubResult.IsAccepted) + { + // Great, the job is being taken care of by the runner + ZEN_DEBUG("direct schedule LSN {}", Pending->ActionLsn); + } + else + { + ZEN_DEBUG("action {} ({}) PENDING", Pending->ActionId, Pending->ActionLsn); + + Pending->SetActionState(RunnerAction::State::Pending); + } + + if (m_Recorder) + { + m_Recorder->RecordAction(Pending); + } + + CbObjectWriter Writer; + Writer << "lsn" << Pending->ActionLsn; + Writer << "worker" << Pending->Worker.WorkerId; + Writer << "action" << Pending->ActionId; + + return {Pending->ActionLsn, Writer.Save()}; +} + +SubmitResult +FunctionServiceSession::Impl::SubmitAction(Ref<RunnerAction> Action) +{ + // Loosely round-robin scheduling of actions across runners. + // + // It's not entirely clear what this means given that submits + // can come in across multiple threads, but it's probably better + // than always starting with the first runner. + // + // Longer term we should track the state of the individual + // runners and make decisions accordingly. + + SubmitResult Result = m_LocalRunnerGroup.SubmitAction(Action); + if (Result.IsAccepted) + { + return Result; + } + + return m_RemoteRunnerGroup.SubmitAction(Action); +} + +size_t +FunctionServiceSession::Impl::GetSubmittedActionCount() +{ + return m_LocalRunnerGroup.GetSubmittedActionCount() + m_RemoteRunnerGroup.GetSubmittedActionCount(); +} + +HttpResponseCode +FunctionServiceSession::Impl::GetActionResult(int ActionLsn, CbPackage& OutResultPackage) +{ + // This lock is held for the duration of the function since we need to + // be sure that the action doesn't change state while we are checking the + // different data structures + + RwLock::ExclusiveLockScope _(m_ResultsLock); + + if (auto It = m_ResultsMap.find(ActionLsn); It != m_ResultsMap.end()) + { + OutResultPackage = std::move(It->second->GetResult()); + + m_ResultsMap.erase(It); + + return HttpResponseCode::OK; + } + + { + RwLock::SharedLockScope __(m_PendingLock); + + if (auto FindIt = m_PendingActions.find(ActionLsn); FindIt != m_PendingActions.end()) + { + return HttpResponseCode::Accepted; + } + } + + // Lock order is important here to avoid deadlocks, RwLock m_RunningLock must + // always be taken after m_ResultsLock if both are needed + + { + RwLock::SharedLockScope __(m_RunningLock); + + if (m_RunningMap.find(ActionLsn) != m_RunningMap.end()) + { + return HttpResponseCode::Accepted; + } + } + + return HttpResponseCode::NotFound; +} + +HttpResponseCode +FunctionServiceSession::Impl::FindActionResult(const IoHash& ActionId, CbPackage& OutResultPackage) +{ + // This lock is held for the duration of the function since we need to + // be sure that the action doesn't change state while we are checking the + // different data structures + + RwLock::ExclusiveLockScope _(m_ResultsLock); + + for (auto It = begin(m_ResultsMap), End = end(m_ResultsMap); It != End; ++It) + { + if (It->second->ActionId == ActionId) + { + OutResultPackage = std::move(It->second->GetResult()); + + m_ResultsMap.erase(It); + + return HttpResponseCode::OK; + } + } + + { + RwLock::SharedLockScope __(m_PendingLock); + + for (const auto& [K, Pending] : m_PendingActions) + { + if (Pending->ActionId == ActionId) + { + return HttpResponseCode::Accepted; + } + } + } + + // Lock order is important here to avoid deadlocks, RwLock m_RunningLock must + // always be taken after m_ResultsLock if both are needed + + { + RwLock::SharedLockScope __(m_RunningLock); + + for (const auto& [K, v] : m_RunningMap) + { + if (v->ActionId == ActionId) + { + return HttpResponseCode::Accepted; + } + } + } + + return HttpResponseCode::NotFound; +} + +void +FunctionServiceSession::Impl::GetCompleted(CbWriter& Cbo) +{ + Cbo.BeginArray("completed"); + + m_ResultsLock.WithSharedLock([&] { + for (auto& Kv : m_ResultsMap) + { + Cbo << Kv.first; + } + }); + + Cbo.EndArray(); +} + +# define ZEN_BATCH_SCHEDULER 1 + +void +FunctionServiceSession::Impl::SchedulePendingActions() +{ + int ScheduledCount = 0; + size_t RunningCount = m_RunningLock.WithSharedLock([&] { return m_RunningMap.size(); }); + size_t PendingCount = m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); }); + size_t ResultCount = m_ResultsLock.WithSharedLock([&] { return m_ResultsMap.size(); }); + + static Stopwatch DumpRunningTimer; + + auto _ = MakeGuard([&] { + ZEN_INFO("scheduled {} pending actions. {} running ({} retired), {} still pending, {} results", + ScheduledCount, + RunningCount, + m_RetiredCount.load(), + PendingCount, + ResultCount); + + if (DumpRunningTimer.GetElapsedTimeMs() > 30000) + { + DumpRunningTimer.Reset(); + + std::set<int> RunningList; + m_RunningLock.WithSharedLock([&] { + for (auto& [K, V] : m_RunningMap) + { + RunningList.insert(K); + } + }); + + ExtendableStringBuilder<1024> RunningString; + for (int i : RunningList) + { + if (RunningString.Size()) + { + RunningString << ", "; + } + + RunningString.Append(IntNum(i)); + } + + ZEN_INFO("running: {}", RunningString); + } + }); + +# if ZEN_BATCH_SCHEDULER + size_t Capacity = QueryCapacity(); + + if (!Capacity) + { + _.Dismiss(); + + return; + } + + std::vector<Ref<RunnerAction>> ActionsToSchedule; + + // Pull actions to schedule from the pending queue, we will try to submit these to the runner outside of the lock + + m_PendingLock.WithExclusiveLock([&] { + if (m_ShutdownRequested) + { + return; + } + + if (m_PendingActions.empty()) + { + return; + } + + size_t NumActionsToSchedule = std::min(Capacity, m_PendingActions.size()); + + auto PendingIt = m_PendingActions.begin(); + const auto PendingEnd = m_PendingActions.end(); + + while (NumActionsToSchedule && PendingIt != PendingEnd) + { + const Ref<RunnerAction>& Pending = PendingIt->second; + + switch (Pending->ActionState()) + { + case RunnerAction::State::Pending: + ActionsToSchedule.push_back(Pending); + break; + + case RunnerAction::State::Running: + case RunnerAction::State::Completed: + case RunnerAction::State::Failed: + break; + + default: + case RunnerAction::State::New: + ZEN_WARN("unexpected state {} for pending action {}", static_cast<int>(Pending->ActionState()), Pending->ActionLsn); + break; + } + + ++PendingIt; + --NumActionsToSchedule; + } + + PendingCount = m_PendingActions.size(); + }); + + if (ActionsToSchedule.empty()) + { + _.Dismiss(); + return; + } + + ZEN_INFO("attempting schedule of {} pending actions", ActionsToSchedule.size()); + + auto SubmitResults = SubmitActions(ActionsToSchedule); + + // Move successfully scheduled actions to the running map and remove + // from pending queue. It's actually possible that by the time we get + // to this stage some of the actions may have already completed, so + // they should not always be added to the running map + + eastl::hash_set<int> ScheduledActions; + + for (size_t i = 0; i < ActionsToSchedule.size(); ++i) + { + const Ref<RunnerAction>& Pending = ActionsToSchedule[i]; + const SubmitResult& SubResult = SubmitResults[i]; + + if (SubResult.IsAccepted) + { + ScheduledActions.insert(Pending->ActionLsn); + } + } + + ScheduledCount += (int)ActionsToSchedule.size(); + +# else + m_PendingLock.WithExclusiveLock([&] { + while (!m_PendingActions.empty()) + { + if (m_ShutdownRequested) + { + return; + } + + // Here it would be good if we could decide to pop immediately to avoid + // holding the lock while creating processes etc + const Ref<RunnerAction>& Pending = m_PendingActions.begin()->second; + FunctionRunner::SubmitResult SubResult = SubmitAction(Pending); + + if (SubResult.IsAccepted) + { + // Great, the job is being taken care of by the runner + + ZEN_DEBUG("action {} ({}) PENDING -> RUNNING", Pending->ActionId, Pending->ActionLsn); + + m_RunningLock.WithExclusiveLock([&] { + m_RunningMap.insert({Pending->ActionLsn, Pending}); + + RunningCount = m_RunningMap.size(); + }); + + m_PendingActions.pop_front(); + + PendingCount = m_PendingActions.size(); + ++ScheduledCount; + } + else + { + // Runner could not accept the job, leave it on the pending queue + + return; + } + } + }); +# endif +} + +void +FunctionServiceSession::Impl::MonitorThreadFunction() +{ + SetCurrentThreadName("FunctionServiceSession_Monitor"); + + auto _ = MakeGuard([&] { ZEN_INFO("monitor thread exiting"); }); + + do + { + int TimeoutMs = 1000; + + if (m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); })) + { + TimeoutMs = 100; + } + + const bool Timedout = m_SchedulingThreadEvent.Wait(TimeoutMs); + + if (m_SchedulingThreadEnabled == false) + { + return; + } + + HandleActionUpdates(); + + // Schedule pending actions + + SchedulePendingActions(); + + if (!Timedout) + { + m_SchedulingThreadEvent.Reset(); + } + } while (m_SchedulingThreadEnabled); +} + +void +FunctionServiceSession::Impl::PostUpdate(RunnerAction* Action) +{ + m_UpdatedActionsLock.WithExclusiveLock([&] { m_UpdatedActions.emplace_back(Action); }); +} + +void +FunctionServiceSession::Impl::HandleActionUpdates() +{ + std::vector<Ref<RunnerAction>> UpdatedActions; + + m_UpdatedActionsLock.WithExclusiveLock([&] { std::swap(UpdatedActions, m_UpdatedActions); }); + + std::unordered_set<int> SeenLsn; + std::unordered_set<int> RunningLsn; + + for (Ref<RunnerAction>& Action : UpdatedActions) + { + const int ActionLsn = Action->ActionLsn; + + if (auto [It, Inserted] = SeenLsn.insert(ActionLsn); Inserted) + { + switch (Action->ActionState()) + { + case RunnerAction::State::Pending: + m_PendingLock.WithExclusiveLock([&] { m_PendingActions.insert({ActionLsn, Action}); }); + break; + + case RunnerAction::State::Running: + m_PendingLock.WithExclusiveLock([&] { + m_RunningLock.WithExclusiveLock([&] { + m_RunningMap.insert({ActionLsn, Action}); + m_PendingActions.erase(ActionLsn); + }); + }); + ZEN_DEBUG("action {} ({}) RUNNING", Action->ActionId, ActionLsn); + break; + + case RunnerAction::State::Completed: + case RunnerAction::State::Failed: + m_ResultsLock.WithExclusiveLock([&] { + m_ResultsMap[ActionLsn] = Action; + + m_PendingLock.WithExclusiveLock([&] { + m_RunningLock.WithExclusiveLock([&] { + if (auto FindIt = m_RunningMap.find(ActionLsn); FindIt == m_RunningMap.end()) + { + m_PendingActions.erase(ActionLsn); + } + else + { + m_RunningMap.erase(FindIt); + } + }); + }); + + m_ActionHistoryLock.WithExclusiveLock([&] { + ActionHistoryEntry Entry{.Lsn = ActionLsn, + .ActionId = Action->ActionId, + .WorkerId = Action->Worker.WorkerId, + .ActionDescriptor = Action->ActionObj, + .Succeeded = Action->ActionState() == RunnerAction::State::Completed}; + + std::copy(std::begin(Action->Timestamps), std::end(Action->Timestamps), std::begin(Entry.Timestamps)); + + m_ActionHistory.push_back(std::move(Entry)); + + if (m_ActionHistory.size() > m_HistoryLimit) + { + m_ActionHistory.pop_front(); + } + }); + }); + m_RetiredCount.fetch_add(1); + m_ResultRate.Mark(1); + ZEN_DEBUG("action {} ({}) RUNNING -> COMPLETED with {}", + Action->ActionId, + ActionLsn, + Action->ActionState() == RunnerAction::State::Completed ? "SUCCESS" : "FAILURE"); + break; + } + } + } +} + +size_t +FunctionServiceSession::Impl::QueryCapacity() +{ + return m_LocalRunnerGroup.QueryCapacity() + m_RemoteRunnerGroup.QueryCapacity(); +} + +std::vector<SubmitResult> +FunctionServiceSession::Impl::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) +{ + std::vector<SubmitResult> Results; + + for (const Ref<RunnerAction>& Action : Actions) + { + Results.push_back(SubmitAction(Action)); + } + + return Results; +} + +////////////////////////////////////////////////////////////////////////// + +FunctionServiceSession::FunctionServiceSession(ChunkResolver& InChunkResolver) +{ + m_Impl = std::make_unique<Impl>(this, InChunkResolver); +} + +FunctionServiceSession::~FunctionServiceSession() +{ + Shutdown(); +} + +bool +FunctionServiceSession::IsHealthy() +{ + return m_Impl->IsHealthy(); +} + +void +FunctionServiceSession::Shutdown() +{ + m_Impl->Shutdown(); +} + +void +FunctionServiceSession::StartRecording(ChunkResolver& InResolver, const std::filesystem::path& RecordingPath) +{ + m_Impl->StartRecording(InResolver, RecordingPath); +} + +void +FunctionServiceSession::StopRecording() +{ + m_Impl->StopRecording(); +} + +void +FunctionServiceSession::EmitStats(CbObjectWriter& Cbo) +{ + m_Impl->EmitStats(Cbo); +} + +std::vector<IoHash> +FunctionServiceSession::GetKnownWorkerIds() +{ + return m_Impl->GetKnownWorkerIds(); +} + +WorkerDesc +FunctionServiceSession::GetWorkerDescriptor(const IoHash& WorkerId) +{ + return m_Impl->GetWorkerDescriptor(WorkerId); +} + +void +FunctionServiceSession::AddLocalRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath) +{ + m_Impl->m_LocalRunnerGroup.AddRunner(new LocalProcessRunner(InChunkResolver, BasePath)); +} + +void +FunctionServiceSession::AddRemoteRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath, std::string_view HostName) +{ + m_Impl->m_RemoteRunnerGroup.AddRunner(new RemoteHttpRunner(InChunkResolver, BasePath, HostName)); +} + +FunctionServiceSession::EnqueueResult +FunctionServiceSession::EnqueueAction(CbObject ActionObject, int Priority) +{ + return m_Impl->EnqueueAction(ActionObject, Priority); +} + +FunctionServiceSession::EnqueueResult +FunctionServiceSession::EnqueueResolvedAction(WorkerDesc Worker, CbObject ActionObj, int RequestPriority) +{ + return m_Impl->EnqueueResolvedAction(Worker, ActionObj, RequestPriority); +} + +void +FunctionServiceSession::RegisterWorker(CbPackage Worker) +{ + m_Impl->RegisterWorker(Worker); +} + +HttpResponseCode +FunctionServiceSession::GetActionResult(int ActionLsn, CbPackage& OutResultPackage) +{ + return m_Impl->GetActionResult(ActionLsn, OutResultPackage); +} + +HttpResponseCode +FunctionServiceSession::FindActionResult(const IoHash& ActionId, CbPackage& OutResultPackage) +{ + return m_Impl->FindActionResult(ActionId, OutResultPackage); +} + +std::vector<FunctionServiceSession::ActionHistoryEntry> +FunctionServiceSession::GetActionHistory(int Limit) +{ + return m_Impl->GetActionHistory(Limit); +} + +void +FunctionServiceSession::GetCompleted(CbWriter& Cbo) +{ + m_Impl->GetCompleted(Cbo); +} + +void +FunctionServiceSession::PostUpdate(RunnerAction* Action) +{ + m_Impl->PostUpdate(Action); +} + +////////////////////////////////////////////////////////////////////////// + +void +function_forcelink() +{ +} + +} // namespace zen::compute + +#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zencompute/httpfunctionservice.cpp b/src/zencompute/httpfunctionservice.cpp new file mode 100644 index 000000000..09a9684a7 --- /dev/null +++ b/src/zencompute/httpfunctionservice.cpp @@ -0,0 +1,709 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "zencompute/httpfunctionservice.h" + +#if ZEN_WITH_COMPUTE_SERVICES + +# include "functionrunner.h" + +# include <zencore/compactbinary.h> +# include <zencore/compactbinarybuilder.h> +# include <zencore/compactbinarypackage.h> +# include <zencore/compress.h> +# include <zencore/except.h> +# include <zencore/filesystem.h> +# include <zencore/fmtutils.h> +# include <zencore/iobuffer.h> +# include <zencore/iohash.h> +# include <zencore/system.h> +# include <zenstore/cidstore.h> + +# include <span> + +using namespace std::literals; + +namespace zen::compute { + +constinit AsciiSet g_DecimalSet("0123456789"); +auto DecimalMatcher = [](std::string_view Str) { return AsciiSet::HasOnly(Str, g_DecimalSet); }; + +constinit AsciiSet g_HexSet("0123456789abcdefABCDEF"); +auto IoHashMatcher = [](std::string_view Str) { return Str.size() == 40 && AsciiSet::HasOnly(Str, g_HexSet); }; + +HttpFunctionService::HttpFunctionService(CidStore& InCidStore, + IHttpStatsService& StatsService, + [[maybe_unused]] const std::filesystem::path& BaseDir) +: m_CidStore(InCidStore) +, m_StatsService(StatsService) +, m_Log(logging::Get("apply")) +, m_BaseDir(BaseDir) +, m_FunctionService(InCidStore) +{ + m_FunctionService.AddLocalRunner(InCidStore, m_BaseDir / "local"); + + m_StatsService.RegisterHandler("apply", *this); + + m_Router.AddMatcher("lsn", DecimalMatcher); + m_Router.AddMatcher("worker", IoHashMatcher); + m_Router.AddMatcher("action", IoHashMatcher); + + m_Router.RegisterRoute( + "ready", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + if (m_FunctionService.IsHealthy()) + { + return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, "ok"); + } + + return HttpReq.WriteResponse(HttpResponseCode::ServiceUnavailable); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "workers", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + CbObjectWriter Cbo; + Cbo.BeginArray("workers"sv); + for (const IoHash& WorkerId : m_FunctionService.GetKnownWorkerIds()) + { + Cbo << WorkerId; + } + Cbo.EndArray(); + + return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "workers/{worker}", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); + + switch (HttpReq.RequestVerb()) + { + case HttpVerb::kGet: + if (WorkerDesc Desc = m_FunctionService.GetWorkerDescriptor(WorkerId)) + { + return HttpReq.WriteResponse(HttpResponseCode::OK, Desc.Descriptor.GetObject()); + } + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + + case HttpVerb::kPost: + { + switch (HttpReq.RequestContentType()) + { + case HttpContentType::kCbObject: + { + CbObject WorkerSpec = HttpReq.ReadPayloadObject(); + + // Determine which pieces are missing and need to be transmitted + + HashKeySet ChunkSet; + + WorkerSpec.IterateAttachments([&](CbFieldView Field) { + const IoHash Hash = Field.AsHash(); + ChunkSet.AddHashToSet(Hash); + }); + + CbPackage WorkerPackage; + WorkerPackage.SetObject(WorkerSpec); + + m_CidStore.FilterChunks(ChunkSet); + + if (ChunkSet.IsEmpty()) + { + ZEN_DEBUG("worker {}: all attachments already available", WorkerId); + m_FunctionService.RegisterWorker(WorkerPackage); + return HttpReq.WriteResponse(HttpResponseCode::NoContent); + } + + CbObjectWriter ResponseWriter; + ResponseWriter.BeginArray("need"); + + ChunkSet.IterateHashes([&](const IoHash& Hash) { + ZEN_DEBUG("worker {}: need chunk {}", WorkerId, Hash); + ResponseWriter.AddHash(Hash); + }); + + ResponseWriter.EndArray(); + + ZEN_DEBUG("worker {}: need {} attachments", WorkerId, ChunkSet.GetSize()); + + return HttpReq.WriteResponse(HttpResponseCode::NotFound, ResponseWriter.Save()); + } + break; + + case HttpContentType::kCbPackage: + { + CbPackage WorkerSpecPackage = HttpReq.ReadPayloadPackage(); + CbObject WorkerSpec = WorkerSpecPackage.GetObject(); + + std::span<const CbAttachment> Attachments = WorkerSpecPackage.GetAttachments(); + + int AttachmentCount = 0; + int NewAttachmentCount = 0; + uint64_t TotalAttachmentBytes = 0; + uint64_t TotalNewBytes = 0; + + for (const CbAttachment& Attachment : Attachments) + { + ZEN_ASSERT(Attachment.IsCompressedBinary()); + + const IoHash DataHash = Attachment.GetHash(); + CompressedBuffer Buffer = Attachment.AsCompressedBinary(); + + ZEN_UNUSED(DataHash); + TotalAttachmentBytes += Buffer.GetCompressedSize(); + ++AttachmentCount; + + const CidStore::InsertResult InsertResult = + m_CidStore.AddChunk(Buffer.GetCompressed().Flatten().AsIoBuffer(), DataHash); + + if (InsertResult.New) + { + TotalNewBytes += Buffer.GetCompressedSize(); + ++NewAttachmentCount; + } + } + + ZEN_DEBUG("worker {}: {} in {} attachments, {} in {} new attachments", + WorkerId, + zen::NiceBytes(TotalAttachmentBytes), + AttachmentCount, + zen::NiceBytes(TotalNewBytes), + NewAttachmentCount); + + m_FunctionService.RegisterWorker(WorkerSpecPackage); + + return HttpReq.WriteResponse(HttpResponseCode::NoContent); + } + break; + + default: + break; + } + } + break; + + default: + break; + } + }, + HttpVerb::kGet | HttpVerb::kPost); + + m_Router.RegisterRoute( + "jobs/completed", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + CbObjectWriter Cbo; + m_FunctionService.GetCompleted(Cbo); + + SystemMetrics Sm = GetSystemMetricsForReporting(); + Cbo.BeginObject("metrics"); + Describe(Sm, Cbo); + Cbo.EndObject(); + + HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "jobs/history", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + const auto QueryParams = HttpReq.GetQueryParams(); + + int QueryLimit = 50; + + if (auto LimitParam = QueryParams.GetValue("limit"); LimitParam.empty() == false) + { + QueryLimit = ParseInt<int>(LimitParam).value_or(50); + } + + CbObjectWriter Cbo; + Cbo.BeginArray("history"); + for (const auto& Entry : m_FunctionService.GetActionHistory(QueryLimit)) + { + Cbo.BeginObject(); + Cbo << "lsn"sv << Entry.Lsn; + Cbo << "actionId"sv << Entry.ActionId; + Cbo << "workerId"sv << Entry.WorkerId; + Cbo << "succeeded"sv << Entry.Succeeded; + Cbo << "actionDescriptor"sv << Entry.ActionDescriptor; + + for (const auto& Timestamp : Entry.Timestamps) + { + Cbo.AddInteger( + fmt::format("time_{}"sv, RunnerAction::ToString(static_cast<RunnerAction::State>(&Timestamp - Entry.Timestamps))), + Timestamp); + } + Cbo.EndObject(); + } + Cbo.EndArray(); + + HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "jobs/{lsn}", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + const int ActionLsn = std::stoi(std::string{Req.GetCapture(1)}); + + switch (HttpReq.RequestVerb()) + { + case HttpVerb::kGet: + { + CbPackage Output; + HttpResponseCode ResponseCode = m_FunctionService.GetActionResult(ActionLsn, Output); + + if (ResponseCode == HttpResponseCode::OK) + { + return HttpReq.WriteResponse(HttpResponseCode::OK, Output); + } + + return HttpReq.WriteResponse(ResponseCode); + } + break; + + case HttpVerb::kPost: + { + // Add support for cancellation, priority changes + } + break; + + default: + break; + } + }, + HttpVerb::kGet | HttpVerb::kPost); + + m_Router.RegisterRoute( + "jobs/{worker}/{action}", // This route is inefficient, and is only here for backwards compatibility. The preferred path is the + // one which uses the scheduled action lsn for lookups + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + const IoHash ActionId = IoHash::FromHexString(Req.GetCapture(2)); + + CbPackage Output; + if (HttpResponseCode ResponseCode = m_FunctionService.FindActionResult(ActionId, /* out */ Output); + ResponseCode != HttpResponseCode::OK) + { + ZEN_TRACE("jobs/{}/{}: {}", Req.GetCapture(1), Req.GetCapture(2), ToString(ResponseCode)) + + if (ResponseCode == HttpResponseCode::NotFound) + { + return HttpReq.WriteResponse(ResponseCode); + } + + return HttpReq.WriteResponse(ResponseCode); + } + + ZEN_DEBUG("jobs/{}/{}: OK", Req.GetCapture(1), Req.GetCapture(2)) + + return HttpReq.WriteResponse(HttpResponseCode::OK, Output); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "jobs/{worker}", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); + + WorkerDesc Worker = m_FunctionService.GetWorkerDescriptor(WorkerId); + + if (!Worker) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + + const auto QueryParams = Req.ServerRequest().GetQueryParams(); + + int RequestPriority = -1; + + if (auto PriorityParam = QueryParams.GetValue("priority"); PriorityParam.empty() == false) + { + RequestPriority = ParseInt<int>(PriorityParam).value_or(-1); + } + + switch (HttpReq.RequestVerb()) + { + case HttpVerb::kGet: + // TODO: return status of all pending or executing jobs + break; + + case HttpVerb::kPost: + switch (HttpReq.RequestContentType()) + { + case HttpContentType::kCbObject: + { + // This operation takes the proposed job spec and identifies which + // chunks are not present on this server. This list is then returned in + // the "need" list in the response + + IoBuffer Payload = HttpReq.ReadPayload(); + CbObject ActionObj = LoadCompactBinaryObject(Payload); + + std::vector<IoHash> NeedList; + + ActionObj.IterateAttachments([&](CbFieldView Field) { + const IoHash FileHash = Field.AsHash(); + + if (!m_CidStore.ContainsChunk(FileHash)) + { + NeedList.push_back(FileHash); + } + }); + + if (NeedList.empty()) + { + // We already have everything, enqueue the action for execution + + if (FunctionServiceSession::EnqueueResult Result = + m_FunctionService.EnqueueResolvedAction(Worker, ActionObj, RequestPriority)) + { + ZEN_DEBUG("action {} accepted (lsn {})", ActionObj.GetHash(), Result.Lsn); + + HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage); + } + + return; + } + + CbObjectWriter Cbo; + Cbo.BeginArray("need"); + + for (const IoHash& Hash : NeedList) + { + Cbo << Hash; + } + + Cbo.EndArray(); + CbObject Response = Cbo.Save(); + + return HttpReq.WriteResponse(HttpResponseCode::NotFound, Response); + } + break; + + case HttpContentType::kCbPackage: + { + CbPackage Action = HttpReq.ReadPayloadPackage(); + CbObject ActionObj = Action.GetObject(); + + std::span<const CbAttachment> Attachments = Action.GetAttachments(); + + int AttachmentCount = 0; + int NewAttachmentCount = 0; + uint64_t TotalAttachmentBytes = 0; + uint64_t TotalNewBytes = 0; + + for (const CbAttachment& Attachment : Attachments) + { + ZEN_ASSERT(Attachment.IsCompressedBinary()); + + const IoHash DataHash = Attachment.GetHash(); + CompressedBuffer DataView = Attachment.AsCompressedBinary(); + + ZEN_UNUSED(DataHash); + + const uint64_t CompressedSize = DataView.GetCompressedSize(); + + TotalAttachmentBytes += CompressedSize; + ++AttachmentCount; + + const CidStore::InsertResult InsertResult = + m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash); + + if (InsertResult.New) + { + TotalNewBytes += CompressedSize; + ++NewAttachmentCount; + } + } + + if (FunctionServiceSession::EnqueueResult Result = + m_FunctionService.EnqueueResolvedAction(Worker, ActionObj, RequestPriority)) + { + ZEN_DEBUG("accepted action {} (lsn {}): {} in {} attachments. {} new ({} attachments)", + ActionObj.GetHash(), + Result.Lsn, + zen::NiceBytes(TotalAttachmentBytes), + AttachmentCount, + zen::NiceBytes(TotalNewBytes), + NewAttachmentCount); + + HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage); + } + + return; + } + break; + + default: + break; + } + break; + + default: + break; + } + }, + HttpVerb::kPost); + + m_Router.RegisterRoute( + "jobs", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + const auto QueryParams = HttpReq.GetQueryParams(); + + int RequestPriority = -1; + + if (auto PriorityParam = QueryParams.GetValue("priority"); PriorityParam.empty() == false) + { + RequestPriority = ParseInt<int>(PriorityParam).value_or(-1); + } + + // Resolve worker + + // + + switch (HttpReq.RequestContentType()) + { + case HttpContentType::kCbObject: + { + // This operation takes the proposed job spec and identifies which + // chunks are not present on this server. This list is then returned in + // the "need" list in the response + + IoBuffer Payload = HttpReq.ReadPayload(); + CbObject ActionObj = LoadCompactBinaryObject(Payload); + + std::vector<IoHash> NeedList; + + ActionObj.IterateAttachments([&](CbFieldView Field) { + const IoHash FileHash = Field.AsHash(); + + if (!m_CidStore.ContainsChunk(FileHash)) + { + NeedList.push_back(FileHash); + } + }); + + if (NeedList.empty()) + { + // We already have everything, enqueue the action for execution + + if (FunctionServiceSession::EnqueueResult Result = m_FunctionService.EnqueueAction(ActionObj, RequestPriority)) + { + ZEN_DEBUG("action accepted (lsn {})", Result.Lsn); + + return HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage); + } + else + { + // Could not resolve? + return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage); + } + } + + CbObjectWriter Cbo; + Cbo.BeginArray("need"); + + for (const IoHash& Hash : NeedList) + { + Cbo << Hash; + } + + Cbo.EndArray(); + CbObject Response = Cbo.Save(); + + return HttpReq.WriteResponse(HttpResponseCode::NotFound, Response); + } + + case HttpContentType::kCbPackage: + { + CbPackage Action = HttpReq.ReadPayloadPackage(); + CbObject ActionObj = Action.GetObject(); + + std::span<const CbAttachment> Attachments = Action.GetAttachments(); + + int AttachmentCount = 0; + int NewAttachmentCount = 0; + uint64_t TotalAttachmentBytes = 0; + uint64_t TotalNewBytes = 0; + + for (const CbAttachment& Attachment : Attachments) + { + ZEN_ASSERT(Attachment.IsCompressedBinary()); + + const IoHash DataHash = Attachment.GetHash(); + CompressedBuffer DataView = Attachment.AsCompressedBinary(); + + ZEN_UNUSED(DataHash); + + const uint64_t CompressedSize = DataView.GetCompressedSize(); + + TotalAttachmentBytes += CompressedSize; + ++AttachmentCount; + + const CidStore::InsertResult InsertResult = + m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash); + + if (InsertResult.New) + { + TotalNewBytes += CompressedSize; + ++NewAttachmentCount; + } + } + + if (FunctionServiceSession::EnqueueResult Result = m_FunctionService.EnqueueAction(ActionObj, RequestPriority)) + { + ZEN_DEBUG("accepted action (lsn {}): {} in {} attachments. {} new ({} attachments)", + Result.Lsn, + zen::NiceBytes(TotalAttachmentBytes), + AttachmentCount, + zen::NiceBytes(TotalNewBytes), + NewAttachmentCount); + + HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage); + } + else + { + // Could not resolve? + return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage); + } + } + return; + } + }, + HttpVerb::kPost); + + m_Router.RegisterRoute( + "workers/all", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + std::vector<IoHash> WorkerIds = m_FunctionService.GetKnownWorkerIds(); + + CbObjectWriter Cbo; + Cbo.BeginArray("workers"); + + for (const IoHash& WorkerId : WorkerIds) + { + Cbo.BeginObject(); + + Cbo << "id" << WorkerId; + + const auto& Descriptor = m_FunctionService.GetWorkerDescriptor(WorkerId); + + Cbo << "descriptor" << Descriptor.Descriptor.GetObject(); + + Cbo.EndObject(); + } + + Cbo.EndArray(); + + HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "sysinfo", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + SystemMetrics Sm = GetSystemMetricsForReporting(); + + CbObjectWriter Cbo; + Describe(Sm, Cbo); + + Cbo << "cpu_usage" << Sm.CpuUsagePercent; + Cbo << "memory_total" << Sm.SystemMemoryMiB * 1024 * 1024; + Cbo << "memory_used" << (Sm.SystemMemoryMiB - Sm.AvailSystemMemoryMiB) * 1024 * 1024; + Cbo << "disk_used" << 100 * 1024; + Cbo << "disk_total" << 100 * 1024 * 1024; + + return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "record/start", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + m_FunctionService.StartRecording(m_CidStore, m_BaseDir / "recording"); + + return HttpReq.WriteResponse(HttpResponseCode::OK); + }, + HttpVerb::kPost); + + m_Router.RegisterRoute( + "record/stop", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + m_FunctionService.StopRecording(); + + return HttpReq.WriteResponse(HttpResponseCode::OK); + }, + HttpVerb::kPost); +} + +HttpFunctionService::~HttpFunctionService() +{ + m_StatsService.UnregisterHandler("apply", *this); +} + +void +HttpFunctionService::Shutdown() +{ + m_FunctionService.Shutdown(); +} + +const char* +HttpFunctionService::BaseUri() const +{ + return "/apply/"; +} + +void +HttpFunctionService::HandleRequest(HttpServerRequest& Request) +{ + metrics::OperationTiming::Scope $(m_HttpRequests); + + if (m_Router.HandleRequest(Request) == false) + { + ZEN_WARN("No route found for {0}", Request.RelativeUri()); + } +} + +void +HttpFunctionService::HandleStatsRequest(HttpServerRequest& Request) +{ + CbObjectWriter Cbo; + m_FunctionService.EmitStats(Cbo); + + Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); +} + +////////////////////////////////////////////////////////////////////////// + +void +httpfunction_forcelink() +{ +} + +} // namespace zen::compute + +#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zencompute/httporchestrator.cpp b/src/zencompute/httporchestrator.cpp new file mode 100644 index 000000000..39e7e60d7 --- /dev/null +++ b/src/zencompute/httporchestrator.cpp @@ -0,0 +1,81 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "zencompute/httporchestrator.h" + +#include <zencore/compactbinarybuilder.h> +#include <zencore/logging.h> + +namespace zen::compute { + +HttpOrchestratorService::HttpOrchestratorService() : m_Log(logging::Get("orch")) +{ + m_Router.RegisterRoute( + "provision", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + CbObjectWriter Cbo; + Cbo.BeginArray("workers"); + + m_KnownWorkersLock.WithSharedLock([&] { + for (const auto& [WorkerId, Worker] : m_KnownWorkers) + { + Cbo.BeginObject(); + Cbo << "uri" << Worker.BaseUri; + Cbo << "dt" << Worker.LastSeen.GetElapsedTimeMs(); + Cbo.EndObject(); + } + }); + + Cbo.EndArray(); + + HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + }, + HttpVerb::kPost); + + m_Router.RegisterRoute( + "announce", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + CbObject Data = HttpReq.ReadPayloadObject(); + + std::string_view WorkerId = Data["id"].AsString(""); + std::string_view WorkerUri = Data["uri"].AsString(""); + + if (WorkerId.empty() || WorkerUri.empty()) + { + return HttpReq.WriteResponse(HttpResponseCode::BadRequest); + } + + m_KnownWorkersLock.WithExclusiveLock([&] { + auto& Worker = m_KnownWorkers[std::string(WorkerId)]; + Worker.BaseUri = WorkerUri; + Worker.LastSeen.Reset(); + }); + + HttpReq.WriteResponse(HttpResponseCode::OK); + }, + HttpVerb::kPost); +} + +HttpOrchestratorService::~HttpOrchestratorService() +{ +} + +const char* +HttpOrchestratorService::BaseUri() const +{ + return "/orch/"; +} + +void +HttpOrchestratorService::HandleRequest(HttpServerRequest& Request) +{ + if (m_Router.HandleRequest(Request) == false) + { + ZEN_WARN("No route found for {0}", Request.RelativeUri()); + } +} + +} // namespace zen::compute diff --git a/src/zencompute/include/zencompute/functionservice.h b/src/zencompute/include/zencompute/functionservice.h new file mode 100644 index 000000000..1deb99fd5 --- /dev/null +++ b/src/zencompute/include/zencompute/functionservice.h @@ -0,0 +1,132 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/zencore.h> + +#if !defined(ZEN_WITH_COMPUTE_SERVICES) +# define ZEN_WITH_COMPUTE_SERVICES 1 +#endif + +#if ZEN_WITH_COMPUTE_SERVICES + +# include <zencore/compactbinary.h> +# include <zencore/compactbinarypackage.h> +# include <zencore/iohash.h> +# include <zenstore/zenstore.h> +# include <zenhttp/httpcommon.h> + +# include <filesystem> + +namespace zen { +class ChunkResolver; +class CbObjectWriter; +} // namespace zen + +namespace zen::compute { + +class ActionRecorder; +class FunctionServiceSession; +class IActionResultHandler; +class LocalProcessRunner; +class RemoteHttpRunner; +struct RunnerAction; +struct SubmitResult; + +struct WorkerDesc +{ + CbPackage Descriptor; + IoHash WorkerId{IoHash::Zero}; + + inline operator bool() const { return WorkerId != IoHash::Zero; } +}; + +/** + * Lambda style compute function service + * + * The responsibility of this class is to accept function execution requests, and + * schedule them using one or more FunctionRunner instances. It will basically always + * accept requests, queueing them if necessary, and then hand them off to runners + * as they become available. + * + * This is typically fronted by an API service that handles communication with clients. + */ +class FunctionServiceSession final +{ +public: + FunctionServiceSession(ChunkResolver& InChunkResolver); + ~FunctionServiceSession(); + + void Shutdown(); + bool IsHealthy(); + + // Worker registration and discovery + + void RegisterWorker(CbPackage Worker); + [[nodiscard]] WorkerDesc GetWorkerDescriptor(const IoHash& WorkerId); + [[nodiscard]] std::vector<IoHash> GetKnownWorkerIds(); + + // Action runners + + void AddLocalRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath); + void AddRemoteRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath, std::string_view HostName); + + // Action submission + + struct EnqueueResult + { + int Lsn; + CbObject ResponseMessage; + + inline operator bool() const { return Lsn != 0; } + }; + + [[nodiscard]] EnqueueResult EnqueueResolvedAction(WorkerDesc Worker, CbObject ActionObj, int Priority); + [[nodiscard]] EnqueueResult EnqueueAction(CbObject ActionObject, int Priority); + + // Completed action tracking + + [[nodiscard]] HttpResponseCode GetActionResult(int ActionLsn, CbPackage& OutResultPackage); + [[nodiscard]] HttpResponseCode FindActionResult(const IoHash& ActionId, CbPackage& ResultPackage); + + void GetCompleted(CbWriter&); + + // Action history tracking (note that this is separate from completed action tracking, and + // will include actions which have been retired and no longer have their results available) + + struct ActionHistoryEntry + { + int Lsn; + IoHash ActionId; + IoHash WorkerId; + CbObject ActionDescriptor; + bool Succeeded; + uint64_t Timestamps[5] = {}; + }; + + [[nodiscard]] std::vector<ActionHistoryEntry> GetActionHistory(int Limit = 100); + + // Stats reporting + + void EmitStats(CbObjectWriter& Cbo); + + // Recording + + void StartRecording(ChunkResolver& InResolver, const std::filesystem::path& RecordingPath); + void StopRecording(); + +private: + void PostUpdate(RunnerAction* Action); + + friend class FunctionRunner; + friend struct RunnerAction; + + struct Impl; + std::unique_ptr<Impl> m_Impl; +}; + +void function_forcelink(); + +} // namespace zen::compute + +#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zencompute/include/zencompute/httpfunctionservice.h b/src/zencompute/include/zencompute/httpfunctionservice.h new file mode 100644 index 000000000..6e2344ae6 --- /dev/null +++ b/src/zencompute/include/zencompute/httpfunctionservice.h @@ -0,0 +1,73 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/zencore.h> + +#if !defined(ZEN_WITH_COMPUTE_SERVICES) +# define ZEN_WITH_COMPUTE_SERVICES 1 +#endif + +#if ZEN_WITH_COMPUTE_SERVICES + +# include "zencompute/functionservice.h" + +# include <zencore/compactbinary.h> +# include <zencore/compactbinarypackage.h> +# include <zencore/iohash.h> +# include <zencore/logging.h> +# include <zentelemetry/stats.h> +# include <zenhttp/httpserver.h> + +# include <deque> +# include <filesystem> +# include <unordered_map> + +namespace zen { +class CidStore; +} + +namespace zen::compute { + +class HttpFunctionService; +class FunctionService; + +/** + * HTTP interface for compute function service + */ +class HttpFunctionService : public HttpService, public IHttpStatsProvider +{ +public: + HttpFunctionService(CidStore& InCidStore, IHttpStatsService& StatsService, const std::filesystem::path& BaseDir); + ~HttpFunctionService(); + + void Shutdown(); + + virtual const char* BaseUri() const override; + virtual void HandleRequest(HttpServerRequest& Request) override; + + // IHttpStatsProvider + + virtual void HandleStatsRequest(HttpServerRequest& Request) override; + +protected: + CidStore& m_CidStore; + IHttpStatsService& m_StatsService; + LoggerRef Log() { return m_Log; } + +private: + LoggerRef m_Log; + std::filesystem ::path m_BaseDir; + HttpRequestRouter m_Router; + FunctionServiceSession m_FunctionService; + + // Metrics + + metrics::OperationTiming m_HttpRequests; +}; + +void httpfunction_forcelink(); + +} // namespace zen::compute + +#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zencompute/include/zencompute/httporchestrator.h b/src/zencompute/include/zencompute/httporchestrator.h new file mode 100644 index 000000000..168c6d7fe --- /dev/null +++ b/src/zencompute/include/zencompute/httporchestrator.h @@ -0,0 +1,44 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/logging.h> +#include <zencore/thread.h> +#include <zencore/timer.h> +#include <zenhttp/httpserver.h> + +#include <unordered_map> + +namespace zen::compute { + +/** + * Mock orchestrator service, for testing dynamic provisioning + */ + +class HttpOrchestratorService : public HttpService +{ +public: + HttpOrchestratorService(); + ~HttpOrchestratorService(); + + HttpOrchestratorService(const HttpOrchestratorService&) = delete; + HttpOrchestratorService& operator=(const HttpOrchestratorService&) = delete; + + virtual const char* BaseUri() const override; + virtual void HandleRequest(HttpServerRequest& Request) override; + +private: + HttpRequestRouter m_Router; + LoggerRef m_Log; + + struct KnownWorker + { + std::string_view BaseUri; + Stopwatch LastSeen; + }; + + RwLock m_KnownWorkersLock; + std::unordered_map<std::string, KnownWorker> m_KnownWorkers; +}; + +} // namespace zen::compute diff --git a/src/zencompute/include/zencompute/recordingreader.h b/src/zencompute/include/zencompute/recordingreader.h new file mode 100644 index 000000000..bf1aff125 --- /dev/null +++ b/src/zencompute/include/zencompute/recordingreader.h @@ -0,0 +1,127 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencompute/functionservice.h> +#include <zencompute/zencompute.h> +#include <zencore/basicfile.h> +#include <zencore/compactbinarybuilder.h> +#include <zenstore/cidstore.h> +#include <zenstore/gc.h> +#include <zenstore/zenstore.h> + +#include <filesystem> +#include <functional> +#include <unordered_map> + +namespace zen { +class CbObject; +class CbPackage; +struct IoHash; +} // namespace zen + +#if ZEN_WITH_COMPUTE_SERVICES + +namespace zen::compute { + +////////////////////////////////////////////////////////////////////////// + +class RecordingReaderBase +{ + RecordingReaderBase(const RecordingReaderBase&) = delete; + RecordingReaderBase& operator=(const RecordingReaderBase&) = delete; + +public: + RecordingReaderBase() = default; + virtual ~RecordingReaderBase() = 0; + virtual std::unordered_map<IoHash, CbPackage> ReadWorkers() = 0; + virtual void IterateActions(std::function<void(CbObject ActionObject, const IoHash& ActionId)>&& Callback, int TargetParallelism) = 0; + virtual size_t GetActionCount() const = 0; +}; + +////////////////////////////////////////////////////////////////////////// + +/** + * Reader for recordings done via the zencompute recording system, which + * have a shared chunk store and a log of actions with pointers into the + * chunk store for their data. + */ +class RecordingReader : public RecordingReaderBase, public ChunkResolver +{ +public: + explicit RecordingReader(const std::filesystem::path& RecordingPath); + ~RecordingReader(); + + virtual std::unordered_map<zen::IoHash, zen::CbPackage> ReadWorkers() override; + + virtual void IterateActions(std::function<void(CbObject ActionObject, const IoHash& ActionId)>&& Callback, + int TargetParallelism) override; + virtual size_t GetActionCount() const override; + +private: + std::filesystem::path m_RecordingLogDir; + BasicFile m_WorkerDataFile; + BasicFile m_ActionDataFile; + GcManager m_Gc; + CidStore m_CidStore{m_Gc}; + + // ChunkResolver interface + virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) override; + + struct ActionEntry + { + IoHash ActionId; + uint64_t Offset; + uint64_t Size; + }; + + std::vector<ActionEntry> m_Actions; + + void ScanActions(); +}; + +////////////////////////////////////////////////////////////////////////// + +struct LocalResolver : public ChunkResolver +{ + LocalResolver(const LocalResolver&) = delete; + LocalResolver& operator=(const LocalResolver&) = delete; + + LocalResolver() = default; + ~LocalResolver() = default; + + virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) override; + void Add(const IoHash& Cid, IoBuffer Data); + +private: + RwLock MapLock; + std::unordered_map<IoHash, IoBuffer> Attachments; +}; + +/** + * This is a reader for UE/DDB recordings, which have a different layout on + * disk (no shared chunk store) + */ +class UeRecordingReader : public RecordingReaderBase, public ChunkResolver +{ +public: + explicit UeRecordingReader(const std::filesystem::path& RecordingPath); + ~UeRecordingReader(); + + virtual std::unordered_map<zen::IoHash, zen::CbPackage> ReadWorkers() override; + virtual void IterateActions(std::function<void(CbObject ActionObject, const IoHash& ActionId)>&& Callback, + int TargetParallelism) override; + virtual size_t GetActionCount() const override; + virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) override; + +private: + std::filesystem::path m_RecordingDir; + LocalResolver m_LocalResolver; + std::vector<std::filesystem::path> m_WorkDirs; + + CbPackage ReadAction(std::filesystem::path WorkDir); +}; + +} // namespace zen::compute + +#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zencompute/include/zencompute/zencompute.h b/src/zencompute/include/zencompute/zencompute.h new file mode 100644 index 000000000..6dc32eeea --- /dev/null +++ b/src/zencompute/include/zencompute/zencompute.h @@ -0,0 +1,11 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/zencore.h> + +namespace zen { + +void zencompute_forcelinktests(); + +} diff --git a/src/zencompute/localrunner.cpp b/src/zencompute/localrunner.cpp new file mode 100644 index 000000000..9a27f3f3d --- /dev/null +++ b/src/zencompute/localrunner.cpp @@ -0,0 +1,722 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "localrunner.h" + +#if ZEN_WITH_COMPUTE_SERVICES + +# include <zencore/compactbinary.h> +# include <zencore/compactbinarybuilder.h> +# include <zencore/compactbinarypackage.h> +# include <zencore/compress.h> +# include <zencore/except.h> +# include <zencore/filesystem.h> +# include <zencore/fmtutils.h> +# include <zencore/iobuffer.h> +# include <zencore/iohash.h> +# include <zencore/system.h> +# include <zencore/scopeguard.h> +# include <zencore/timer.h> +# include <zenstore/cidstore.h> + +# include <span> + +namespace zen::compute { + +using namespace std::literals; + +LocalProcessRunner::LocalProcessRunner(ChunkResolver& Resolver, const std::filesystem::path& BaseDir) +: FunctionRunner(BaseDir) +, m_Log(logging::Get("local_exec")) +, m_ChunkResolver(Resolver) +, m_WorkerPath(std::filesystem::weakly_canonical(BaseDir / "workers")) +, m_SandboxPath(std::filesystem::weakly_canonical(BaseDir / "scratch")) +{ + SystemMetrics Sm = GetSystemMetricsForReporting(); + + m_MaxRunningActions = Sm.LogicalProcessorCount * 2; + + ZEN_INFO("Max concurrent action count: {}", m_MaxRunningActions); + + bool DidCleanup = false; + + if (std::filesystem::is_directory(m_ActionsPath)) + { + ZEN_INFO("Cleaning '{}'", m_ActionsPath); + + std::error_code Ec; + CleanDirectory(m_ActionsPath, /* ForceRemoveReadOnlyFiles */ true, Ec); + + if (Ec) + { + ZEN_WARN("Unable to clean '{}': {}", m_ActionsPath, Ec.message()); + } + + DidCleanup = true; + } + + if (std::filesystem::is_directory(m_SandboxPath)) + { + ZEN_INFO("Cleaning '{}'", m_SandboxPath); + std::error_code Ec; + CleanDirectory(m_SandboxPath, /* ForceRemoveReadOnlyFiles */ true, Ec); + + if (Ec) + { + ZEN_WARN("Unable to clean '{}': {}", m_SandboxPath, Ec.message()); + } + + DidCleanup = true; + } + + // We clean out all workers on startup since we can't know they are good. They could be bad + // due to tampering, malware (which I also mean to include AV and antimalware software) or + // other processes we have no control over + if (std::filesystem::is_directory(m_WorkerPath)) + { + ZEN_INFO("Cleaning '{}'", m_WorkerPath); + std::error_code Ec; + CleanDirectory(m_WorkerPath, /* ForceRemoveReadOnlyFiles */ true, Ec); + + if (Ec) + { + ZEN_WARN("Unable to clean '{}': {}", m_WorkerPath, Ec.message()); + } + + DidCleanup = true; + } + + if (DidCleanup) + { + ZEN_INFO("Cleanup complete"); + } + + m_MonitorThread = std::thread{&LocalProcessRunner::MonitorThreadFunction, this}; + +# if ZEN_PLATFORM_WINDOWS + // Suppress any error dialogs caused by missing dependencies + UINT OldMode = ::SetErrorMode(0); + ::SetErrorMode(OldMode | SEM_FAILCRITICALERRORS); +# endif + + m_AcceptNewActions = true; +} + +LocalProcessRunner::~LocalProcessRunner() +{ + try + { + Shutdown(); + } + catch (std::exception& Ex) + { + ZEN_WARN("exception during local process runner shutdown: {}", Ex.what()); + } +} + +void +LocalProcessRunner::Shutdown() +{ + m_AcceptNewActions = false; + + m_MonitorThreadEnabled = false; + m_MonitorThreadEvent.Set(); + if (m_MonitorThread.joinable()) + { + m_MonitorThread.join(); + } + + CancelRunningActions(); +} + +std::filesystem::path +LocalProcessRunner::CreateNewSandbox() +{ + std::string UniqueId = std::to_string(++m_SandboxCounter); + std::filesystem::path Path = m_SandboxPath / UniqueId; + zen::CreateDirectories(Path); + + return Path; +} + +void +LocalProcessRunner::RegisterWorker(const CbPackage& WorkerPackage) +{ + if (m_DumpActions) + { + CbObject WorkerDescriptor = WorkerPackage.GetObject(); + const IoHash& WorkerId = WorkerPackage.GetObjectHash(); + + std::string UniqueId = fmt::format("worker_{}"sv, WorkerId); + std::filesystem::path Path = m_ActionsPath / UniqueId; + + zen::WriteFile(Path / "worker.ucb", WorkerDescriptor.GetBuffer().AsIoBuffer()); + + ManifestWorker(WorkerPackage, Path / "tree", [&](const IoHash& Cid, CompressedBuffer& ChunkBuffer) { + std::filesystem::path ChunkPath = Path / "chunks" / Cid.ToHexString(); + zen::WriteFile(ChunkPath, ChunkBuffer.GetCompressed()); + }); + + ZEN_INFO("dumped worker '{}' to 'file://{}'", WorkerId, Path); + } +} + +size_t +LocalProcessRunner::QueryCapacity() +{ + // Estimate how much more work we're ready to accept + + RwLock::SharedLockScope _{m_RunningLock}; + + if (!m_AcceptNewActions) + { + return 0; + } + + size_t RunningCount = m_RunningMap.size(); + + if (RunningCount >= size_t(m_MaxRunningActions)) + { + return 0; + } + + return m_MaxRunningActions - RunningCount; +} + +std::vector<SubmitResult> +LocalProcessRunner::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) +{ + std::vector<SubmitResult> Results; + + for (const Ref<RunnerAction>& Action : Actions) + { + Results.push_back(SubmitAction(Action)); + } + + return Results; +} + +SubmitResult +LocalProcessRunner::SubmitAction(Ref<RunnerAction> Action) +{ + // Verify whether we can accept more work + + { + RwLock::SharedLockScope _{m_RunningLock}; + + if (!m_AcceptNewActions) + { + return SubmitResult{.IsAccepted = false}; + } + + if (m_RunningMap.size() >= size_t(m_MaxRunningActions)) + { + return SubmitResult{.IsAccepted = false}; + } + } + + using namespace std::literals; + + // Each enqueued action is assigned an integer index (logical sequence number), + // which we use as a key for tracking data structures and as an opaque id which + // may be used by clients to reference the scheduled action + + const int32_t ActionLsn = Action->ActionLsn; + const CbObject& ActionObj = Action->ActionObj; + const IoHash ActionId = ActionObj.GetHash(); + + MaybeDumpAction(ActionLsn, ActionObj); + + std::filesystem::path SandboxPath = CreateNewSandbox(); + + CbPackage WorkerPackage = Action->Worker.Descriptor; + + std::filesystem::path WorkerPath = ManifestWorker(Action->Worker); + + // Write out action + + zen::WriteFile(SandboxPath / "build.action", ActionObj.GetBuffer().AsIoBuffer()); + + // Manifest inputs in sandbox + + ActionObj.IterateAttachments([&](CbFieldView Field) { + const IoHash Cid = Field.AsHash(); + std::filesystem::path FilePath{SandboxPath / "Inputs"sv / Cid.ToHexString()}; + IoBuffer DataBuffer = m_ChunkResolver.FindChunkByCid(Cid); + + if (!DataBuffer) + { + throw std::runtime_error(fmt::format("input CID chunk '{}' missing", Cid)); + } + + zen::WriteFile(FilePath, DataBuffer); + }); + +# if ZEN_PLATFORM_WINDOWS + // Set up environment variables + + StringBuilder<1024> EnvironmentBlock; + + CbObject WorkerDescription = WorkerPackage.GetObject(); + + for (auto& It : WorkerDescription["environment"sv]) + { + EnvironmentBlock.Append(It.AsString()); + EnvironmentBlock.Append('\0'); + } + EnvironmentBlock.Append('\0'); + EnvironmentBlock.Append('\0'); + + // Execute process - this spawns the child process immediately without waiting + // for completion + + std::string_view ExecPath = WorkerDescription["path"sv].AsString(); + std::filesystem::path ExePath = WorkerPath / std::filesystem::path(ExecPath).make_preferred(); + + ExtendableWideStringBuilder<512> CommandLine; + CommandLine.Append(L'"'); + CommandLine.Append(ExePath.c_str()); + CommandLine.Append(L'"'); + CommandLine.Append(L" -Build=build.action"); + + LPSECURITY_ATTRIBUTES lpProcessAttributes = nullptr; + LPSECURITY_ATTRIBUTES lpThreadAttributes = nullptr; + BOOL bInheritHandles = FALSE; + DWORD dwCreationFlags = 0; + + STARTUPINFO StartupInfo{}; + StartupInfo.cb = sizeof StartupInfo; + + PROCESS_INFORMATION ProcessInformation{}; + + ZEN_DEBUG("Executing: {}", WideToUtf8(CommandLine.c_str())); + + CommandLine.EnsureNulTerminated(); + + BOOL Success = CreateProcessW(nullptr, + CommandLine.Data(), + lpProcessAttributes, + lpThreadAttributes, + bInheritHandles, + dwCreationFlags, + (LPVOID)EnvironmentBlock.Data(), // Environment block + SandboxPath.c_str(), // Current directory + &StartupInfo, + /* out */ &ProcessInformation); + + if (!Success) + { + // TODO: this is probably not the best way to report failure. The return + // object should include a failure state and context + + zen::ThrowLastError("Unable to launch process" /* TODO: Add context */); + } + + CloseHandle(ProcessInformation.hThread); + + Ref<RunningAction> NewAction{new RunningAction()}; + NewAction->Action = Action; + NewAction->ProcessHandle = ProcessInformation.hProcess; + NewAction->SandboxPath = std::move(SandboxPath); + + { + RwLock::ExclusiveLockScope _(m_RunningLock); + + m_RunningMap[ActionLsn] = std::move(NewAction); + } + + Action->SetActionState(RunnerAction::State::Running); +# else + ZEN_UNUSED(ActionId); + + ZEN_NOT_IMPLEMENTED(); + + int ExitCode = 0; +# endif + + return SubmitResult{.IsAccepted = true}; +} + +size_t +LocalProcessRunner::GetSubmittedActionCount() +{ + RwLock::SharedLockScope _(m_RunningLock); + return m_RunningMap.size(); +} + +std::filesystem::path +LocalProcessRunner::ManifestWorker(const WorkerDesc& Worker) +{ + RwLock::SharedLockScope _(m_WorkerLock); + + std::filesystem::path WorkerDir = m_WorkerPath / fmt::format("runner_{}", Worker.WorkerId); + + if (!std::filesystem::exists(WorkerDir)) + { + _.ReleaseNow(); + + RwLock::ExclusiveLockScope $(m_WorkerLock); + + if (!std::filesystem::exists(WorkerDir)) + { + ManifestWorker(Worker.Descriptor, WorkerDir, [](const IoHash&, CompressedBuffer&) {}); + } + } + + return WorkerDir; +} + +void +LocalProcessRunner::DecompressAttachmentToFile(const CbPackage& FromPackage, + CbObjectView FileEntry, + const std::filesystem::path& SandboxRootPath, + std::function<void(const IoHash&, CompressedBuffer&)>& ChunkReferenceCallback) +{ + std::string_view Name = FileEntry["name"sv].AsString(); + const IoHash ChunkHash = FileEntry["hash"sv].AsHash(); + const uint64_t Size = FileEntry["size"sv].AsUInt64(); + + CompressedBuffer Compressed; + + if (const CbAttachment* Attachment = FromPackage.FindAttachment(ChunkHash)) + { + Compressed = Attachment->AsCompressedBinary(); + } + else + { + IoBuffer DataBuffer = m_ChunkResolver.FindChunkByCid(ChunkHash); + + if (!DataBuffer) + { + throw std::runtime_error(fmt::format("worker chunk '{}' missing", ChunkHash)); + } + + uint64_t DataRawSize = 0; + IoHash DataRawHash; + Compressed = CompressedBuffer::FromCompressed(SharedBuffer{DataBuffer}, DataRawHash, DataRawSize); + + if (DataRawSize != Size) + { + throw std::runtime_error( + fmt::format("worker chunk '{}' size: {}, action spec expected {}", ChunkHash, DataBuffer.Size(), Size)); + } + } + + ChunkReferenceCallback(ChunkHash, Compressed); + + std::filesystem::path FilePath{SandboxRootPath / std::filesystem::path(Name).make_preferred()}; + + SharedBuffer Decompressed = Compressed.Decompress(); + zen::WriteFile(FilePath, Decompressed.AsIoBuffer()); +} + +void +LocalProcessRunner::ManifestWorker(const CbPackage& WorkerPackage, + const std::filesystem::path& SandboxPath, + std::function<void(const IoHash&, CompressedBuffer&)>&& ChunkReferenceCallback) +{ + CbObject WorkerDescription = WorkerPackage.GetObject(); + + // Manifest worker in Sandbox + + for (auto& It : WorkerDescription["executables"sv]) + { + DecompressAttachmentToFile(WorkerPackage, It.AsObjectView(), SandboxPath, ChunkReferenceCallback); + } + + for (auto& It : WorkerDescription["dirs"sv]) + { + std::string_view Name = It.AsString(); + std::filesystem::path DirPath{SandboxPath / std::filesystem::path(Name).make_preferred()}; + zen::CreateDirectories(DirPath); + } + + for (auto& It : WorkerDescription["files"sv]) + { + DecompressAttachmentToFile(WorkerPackage, It.AsObjectView(), SandboxPath, ChunkReferenceCallback); + } + + WriteFile(SandboxPath / "worker.zcb", WorkerDescription.GetBuffer().AsIoBuffer()); +} + +CbPackage +LocalProcessRunner::GatherActionOutputs(std::filesystem::path SandboxPath) +{ + std::filesystem::path OutputFile = SandboxPath / "build.output"; + FileContents OutputData = zen::ReadFile(OutputFile); + + if (OutputData.ErrorCode) + { + throw std::system_error(OutputData.ErrorCode, fmt::format("Failed to read build output file '{}'", OutputFile)); + } + + CbPackage OutputPackage; + CbObject Output = zen::LoadCompactBinaryObject(OutputData.Flatten()); + + uint64_t TotalAttachmentBytes = 0; + uint64_t TotalRawAttachmentBytes = 0; + + Output.IterateAttachments([&](CbFieldView Field) { + IoHash Hash = Field.AsHash(); + std::filesystem::path OutputPath{SandboxPath / "Outputs" / Hash.ToHexString()}; + FileContents ChunkData = zen::ReadFile(OutputPath); + + if (ChunkData.ErrorCode) + { + throw std::system_error(ChunkData.ErrorCode, fmt::format("Failed to read build output file '{}'", OutputPath)); + } + + uint64_t ChunkDataRawSize = 0; + IoHash ChunkDataHash; + CompressedBuffer AttachmentBuffer = + CompressedBuffer::FromCompressed(SharedBuffer(ChunkData.Flatten()), ChunkDataHash, ChunkDataRawSize); + + if (!AttachmentBuffer) + { + throw std::runtime_error("Invalid output encountered (not valid CompressedBuffer format)"); + } + + TotalAttachmentBytes += AttachmentBuffer.GetCompressedSize(); + TotalRawAttachmentBytes += ChunkDataRawSize; + + CbAttachment Attachment(std::move(AttachmentBuffer), ChunkDataHash); + OutputPackage.AddAttachment(Attachment); + }); + + OutputPackage.SetObject(Output); + + ZEN_DEBUG("Action completed with {} attachments ({} compressed, {} uncompressed)", + OutputPackage.GetAttachments().size(), + NiceBytes(TotalAttachmentBytes), + NiceBytes(TotalRawAttachmentBytes)); + + return OutputPackage; +} + +void +LocalProcessRunner::MonitorThreadFunction() +{ + SetCurrentThreadName("LocalProcessRunner_Monitor"); + + auto _ = MakeGuard([&] { ZEN_INFO("monitor thread exiting"); }); + + do + { + // On Windows it's possible to wait on process handles, so we wait for either a process to exit + // or for the monitor event to be signaled (which indicates we should check for cancellation + // or shutdown). This could be further improved by using a completion port and registering process + // handles with it, but this is a reasonable first implementation given that we shouldn't be dealing + // with an enormous number of concurrent processes. + // + // On other platforms we just wait on the monitor event and poll for process exits at intervals. +# if ZEN_PLATFORM_WINDOWS + auto WaitOnce = [&] { + HANDLE WaitHandles[MAXIMUM_WAIT_OBJECTS]; + + uint32_t NumHandles = 0; + + WaitHandles[NumHandles++] = m_MonitorThreadEvent.GetWindowsHandle(); + + m_RunningLock.WithSharedLock([&] { + for (auto It = begin(m_RunningMap), ItEnd = end(m_RunningMap); It != ItEnd && NumHandles < MAXIMUM_WAIT_OBJECTS; ++It) + { + Ref<RunningAction> Action = It->second; + + WaitHandles[NumHandles++] = Action->ProcessHandle; + } + }); + + DWORD WaitResult = WaitForMultipleObjects(NumHandles, WaitHandles, FALSE, 1000); + + // return true if a handle was signaled + return (WaitResult <= NumHandles); + }; +# else + auto WaitOnce = [&] { return m_MonitorThreadEvent.Wait(1000); }; +# endif + + while (!WaitOnce()) + { + if (m_MonitorThreadEnabled == false) + { + return; + } + + SweepRunningActions(); + } + + // Signal received + + SweepRunningActions(); + } while (m_MonitorThreadEnabled); +} + +void +LocalProcessRunner::CancelRunningActions() +{ + Stopwatch Timer; + std::unordered_map<int, Ref<RunningAction>> RunningMap; + + m_RunningLock.WithExclusiveLock([&] { std::swap(RunningMap, m_RunningMap); }); + + if (RunningMap.empty()) + { + return; + } + + ZEN_INFO("cancelling all running actions"); + + // For expedience we initiate the process termination for all known + // processes before attempting to wait for them to exit. + + std::vector<int> TerminatedLsnList; + + for (const auto& Kv : RunningMap) + { + Ref<RunningAction> Action = Kv.second; + + // Terminate running process + +# if ZEN_PLATFORM_WINDOWS + BOOL Success = TerminateProcess(Action->ProcessHandle, 222); + + if (Success) + { + TerminatedLsnList.push_back(Kv.first); + } + else + { + DWORD LastError = GetLastError(); + + if (LastError != ERROR_ACCESS_DENIED) + { + ZEN_WARN("TerminateProcess for LSN {} not successful: {}", Action->Action->ActionLsn, GetSystemErrorAsString(LastError)); + } + } +# else + ZEN_NOT_IMPLEMENTED("need to implement process termination"); +# endif + } + + // We only post results for processes we have terminated, in order + // to avoid multiple results getting posted for the same action + + for (int Lsn : TerminatedLsnList) + { + if (auto It = RunningMap.find(Lsn); It != RunningMap.end()) + { + Ref<RunningAction> Running = It->second; + +# if ZEN_PLATFORM_WINDOWS + if (Running->ProcessHandle != INVALID_HANDLE_VALUE) + { + DWORD WaitResult = WaitForSingleObject(Running->ProcessHandle, 2000); + + if (WaitResult != WAIT_OBJECT_0) + { + ZEN_WARN("wait for LSN {}: process exit did not succeed, result = {}", Running->Action->ActionLsn, WaitResult); + } + else + { + ZEN_DEBUG("LSN {}: process exit OK", Running->Action->ActionLsn); + } + } +# endif + + // Clean up and post error result + + DeleteDirectories(Running->SandboxPath); + Running->Action->SetActionState(RunnerAction::State::Failed); + } + } + + ZEN_INFO("DONE - cancelled {} running processes (took {})", TerminatedLsnList.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); +} + +void +LocalProcessRunner::SweepRunningActions() +{ + std::vector<Ref<RunningAction>> CompletedActions; + + m_RunningLock.WithExclusiveLock([&] { + // TODO: It would be good to not hold the exclusive lock while making + // system calls and other expensive operations. + + for (auto It = begin(m_RunningMap), ItEnd = end(m_RunningMap); It != ItEnd;) + { + Ref<RunningAction> Action = It->second; + +# if ZEN_PLATFORM_WINDOWS + DWORD ExitCode = 0; + BOOL IsSuccess = GetExitCodeProcess(Action->ProcessHandle, &ExitCode); + + if (IsSuccess && ExitCode != STILL_ACTIVE) + { + CloseHandle(Action->ProcessHandle); + Action->ProcessHandle = INVALID_HANDLE_VALUE; + + CompletedActions.push_back(std::move(Action)); + It = m_RunningMap.erase(It); + } + else + { + ++It; + } +# else + // TODO: implement properly for Mac/Linux + + ZEN_UNUSED(Action); +# endif + } + }); + + // Notify outer. Note that this has to be done without holding any local locks + // otherwise we may end up with deadlocks. + + for (Ref<RunningAction> Running : CompletedActions) + { + const int ActionLsn = Running->Action->ActionLsn; + + if (Running->ExitCode == 0) + { + try + { + // Gather outputs + + CbPackage OutputPackage = GatherActionOutputs(Running->SandboxPath); + + Running->Action->SetResult(std::move(OutputPackage)); + Running->Action->SetActionState(RunnerAction::State::Completed); + + // We can delete the files at this point + if (!DeleteDirectories(Running->SandboxPath)) + { + ZEN_WARN("Unable to delete directory '{}', this will continue to exist until service restart", Running->SandboxPath); + } + + // Success -- continue with next iteration of the loop + continue; + } + catch (std::exception& Ex) + { + ZEN_ERROR("Encountered failure while gathering outputs for action lsn {}, '{}'", ActionLsn, Ex.what()); + } + } + + // Failed - for now this is indicated with an empty package in + // the results map. We can clean out the sandbox directory immediately. + + std::error_code Ec; + DeleteDirectories(Running->SandboxPath, Ec); + + if (Ec) + { + ZEN_WARN("Unable to delete sandbox directory '{}': {}", Running->SandboxPath, Ec.message()); + } + + Running->Action->SetActionState(RunnerAction::State::Failed); + } +} + +} // namespace zen::compute + +#endif diff --git a/src/zencompute/localrunner.h b/src/zencompute/localrunner.h new file mode 100644 index 000000000..35f464805 --- /dev/null +++ b/src/zencompute/localrunner.h @@ -0,0 +1,100 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "zencompute/functionservice.h" + +#if ZEN_WITH_COMPUTE_SERVICES + +# include "functionrunner.h" + +# include <zencore/thread.h> +# include <zencore/zencore.h> +# include <zenstore/cidstore.h> +# include <zencore/compactbinarypackage.h> +# include <zencore/logging.h> + +# include <atomic> +# include <filesystem> +# include <thread> + +namespace zen { +class CbPackage; +} + +namespace zen::compute { + +/** Direct process spawner + + This runner simply sets up a directory structure for each job and + creates a process to perform the computation in it. It is not very + efficient and is intended mostly for testing. + + */ + +class LocalProcessRunner : public FunctionRunner +{ + LocalProcessRunner(LocalProcessRunner&&) = delete; + LocalProcessRunner& operator=(LocalProcessRunner&&) = delete; + +public: + LocalProcessRunner(ChunkResolver& Resolver, const std::filesystem::path& BaseDir); + ~LocalProcessRunner(); + + virtual void Shutdown() override; + virtual void RegisterWorker(const CbPackage& WorkerPackage) override; + [[nodiscard]] virtual SubmitResult SubmitAction(Ref<RunnerAction> Action) override; + [[nodiscard]] virtual bool IsHealthy() override { return true; } + [[nodiscard]] virtual size_t GetSubmittedActionCount() override; + [[nodiscard]] virtual size_t QueryCapacity() override; + [[nodiscard]] virtual std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) override; + +protected: + LoggerRef Log() { return m_Log; } + + LoggerRef m_Log; + + struct RunningAction : public RefCounted + { + Ref<RunnerAction> Action; + void* ProcessHandle = nullptr; + int ExitCode = 0; + std::filesystem::path SandboxPath; + }; + + std::atomic_bool m_AcceptNewActions; + ChunkResolver& m_ChunkResolver; + RwLock m_WorkerLock; + std::filesystem::path m_WorkerPath; + std::atomic<int32_t> m_SandboxCounter = 0; + std::filesystem::path m_SandboxPath; + int32_t m_MaxRunningActions = 64; // arbitrary limit for testing + + // if used in conjuction with m_ResultsLock, this lock must be taken *after* + // m_ResultsLock to avoid deadlocks + RwLock m_RunningLock; + std::unordered_map<int, Ref<RunningAction>> m_RunningMap; + + std::thread m_MonitorThread; + std::atomic<bool> m_MonitorThreadEnabled{true}; + Event m_MonitorThreadEvent; + void MonitorThreadFunction(); + void SweepRunningActions(); + void CancelRunningActions(); + + std::filesystem::path CreateNewSandbox(); + void ManifestWorker(const CbPackage& WorkerPackage, + const std::filesystem::path& SandboxPath, + std::function<void(const IoHash&, CompressedBuffer&)>&& ChunkReferenceCallback); + std::filesystem::path ManifestWorker(const WorkerDesc& Worker); + CbPackage GatherActionOutputs(std::filesystem::path SandboxPath); + + void DecompressAttachmentToFile(const CbPackage& FromPackage, + CbObjectView FileEntry, + const std::filesystem::path& SandboxRootPath, + std::function<void(const IoHash&, CompressedBuffer&)>& ChunkReferenceCallback); +}; + +} // namespace zen::compute + +#endif diff --git a/src/zencompute/recordingreader.cpp b/src/zencompute/recordingreader.cpp new file mode 100644 index 000000000..1c1a119cf --- /dev/null +++ b/src/zencompute/recordingreader.cpp @@ -0,0 +1,335 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "zencompute/recordingreader.h" + +#include <zencore/compactbinary.h> +#include <zencore/compactbinaryfile.h> +#include <zencore/compactbinaryvalue.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> + +#if ZEN_PLATFORM_WINDOWS +# include <ppl.h> +# define ZEN_CONCRT_AVAILABLE 1 +#else +# define ZEN_CONCRT_AVAILABLE 0 +#endif + +#if ZEN_WITH_COMPUTE_SERVICES + +namespace zen::compute { + +using namespace std::literals; + +////////////////////////////////////////////////////////////////////////// + +# if ZEN_PLATFORM_WINDOWS +# define ZEN_BUILD_ACTION L"Build.action" +# define ZEN_WORKER_UCB L"worker.ucb" +# else +# define ZEN_BUILD_ACTION "Build.action" +# define ZEN_WORKER_UCB "worker.ucb" +# endif + +////////////////////////////////////////////////////////////////////////// + +struct RecordingTreeVisitor : public FileSystemTraversal::TreeVisitor +{ + virtual void VisitFile(const std::filesystem::path& Parent, + const path_view& File, + uint64_t FileSize, + uint32_t NativeModeOrAttributes, + uint64_t NativeModificationTick) + { + ZEN_UNUSED(Parent, File, FileSize, NativeModeOrAttributes, NativeModificationTick); + + if (File.compare(path_view(ZEN_BUILD_ACTION)) == 0) + { + WorkDirs.push_back(Parent); + } + else if (File.compare(path_view(ZEN_WORKER_UCB)) == 0) + { + WorkerDirs.push_back(Parent); + } + } + + virtual bool VisitDirectory(const std::filesystem::path& Parent, const path_view& DirectoryName, uint32_t NativeModeOrAttributes) + { + ZEN_UNUSED(Parent, DirectoryName, NativeModeOrAttributes); + + return true; + } + + std::vector<std::filesystem::path> WorkerDirs; + std::vector<std::filesystem::path> WorkDirs; +}; + +////////////////////////////////////////////////////////////////////////// + +void +IterateOverArray(auto Array, auto Func, int TargetParallelism) +{ +# if ZEN_CONCRT_AVAILABLE + if (TargetParallelism > 1) + { + concurrency::simple_partitioner Chunker(Array.size() / TargetParallelism); + concurrency::parallel_for_each(begin(Array), end(Array), [&](const auto& Item) { Func(Item); }); + + return; + } +# else + ZEN_UNUSED(TargetParallelism); +# endif + + for (const auto& Item : Array) + { + Func(Item); + } +} + +////////////////////////////////////////////////////////////////////////// + +RecordingReaderBase::~RecordingReaderBase() = default; + +////////////////////////////////////////////////////////////////////////// + +RecordingReader::RecordingReader(const std::filesystem::path& RecordingPath) : m_RecordingLogDir(RecordingPath) +{ + CidStoreConfiguration CidConfig; + CidConfig.RootDirectory = m_RecordingLogDir / "cid"; + CidConfig.HugeValueThreshold = 128 * 1024 * 1024; + + m_CidStore.Initialize(CidConfig); +} + +RecordingReader::~RecordingReader() +{ + m_CidStore.Flush(); +} + +size_t +RecordingReader::GetActionCount() const +{ + return m_Actions.size(); +} + +IoBuffer +RecordingReader::FindChunkByCid(const IoHash& DecompressedId) +{ + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(DecompressedId)) + { + return Chunk; + } + + ZEN_ERROR("failed lookup of chunk with CID '{}'", DecompressedId); + + return {}; +} + +std::unordered_map<zen::IoHash, zen::CbPackage> +RecordingReader::ReadWorkers() +{ + std::unordered_map<zen::IoHash, zen::CbPackage> WorkerMap; + + { + CbObjectFromFile TocFile = LoadCompactBinaryObject(m_RecordingLogDir / "workers.ztoc"); + CbObject Toc = TocFile.Object; + + m_WorkerDataFile.Open(m_RecordingLogDir / "workers.zdat", BasicFile::Mode::kRead); + + ZEN_ASSERT(Toc["version"sv].AsInt32() == 1); + + for (auto& It : Toc["toc"]) + { + CbArrayView Entry = It.AsArrayView(); + CbFieldViewIterator Vit = Entry.CreateViewIterator(); + + const IoHash WorkerId = Vit++->AsHash(); + const uint64_t Offset = Vit++->AsInt64(0); + const uint64_t Size = Vit++->AsInt64(0); + + IoBuffer WorkerRange = m_WorkerDataFile.ReadRange(Offset, Size); + CbObject WorkerDesc = LoadCompactBinaryObject(WorkerRange); + CbPackage& WorkerPkg = WorkerMap[WorkerId]; + WorkerPkg.SetObject(WorkerDesc); + + WorkerDesc.IterateAttachments([&](const zen::CbFieldView AttachmentField) { + const IoHash AttachmentCid = AttachmentField.GetValue().AsHash(); + IoBuffer AttachmentData = m_CidStore.FindChunkByCid(AttachmentCid); + + if (AttachmentData) + { + IoHash RawHash; + uint64_t RawSize = 0; + CompressedBuffer CompressedData = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentData), RawHash, RawSize); + WorkerPkg.AddAttachment(CbAttachment(CompressedData, RawHash)); + } + }); + } + } + + // Scan actions as well (this should be called separately, ideally) + + ScanActions(); + + return WorkerMap; +} + +void +RecordingReader::ScanActions() +{ + CbObjectFromFile TocFile = LoadCompactBinaryObject(m_RecordingLogDir / "actions.ztoc"); + CbObject Toc = TocFile.Object; + + m_ActionDataFile.Open(m_RecordingLogDir / "actions.zdat", BasicFile::Mode::kRead); + + ZEN_ASSERT(Toc["version"sv].AsInt32() == 1); + + for (auto& It : Toc["toc"]) + { + CbArrayView ArrayEntry = It.AsArrayView(); + CbFieldViewIterator Vit = ArrayEntry.CreateViewIterator(); + + ActionEntry Entry; + Entry.ActionId = Vit++->AsHash(); + Entry.Offset = Vit++->AsInt64(0); + Entry.Size = Vit++->AsInt64(0); + + m_Actions.push_back(Entry); + } +} + +void +RecordingReader::IterateActions(std::function<void(CbObject ActionObject, const IoHash& ActionId)>&& Callback, int TargetParallelism) +{ + IterateOverArray( + m_Actions, + [&](const ActionEntry& Entry) { + CbObject ActionDesc = LoadCompactBinaryObject(m_ActionDataFile.ReadRange(Entry.Offset, Entry.Size)); + + Callback(ActionDesc, Entry.ActionId); + }, + TargetParallelism); +} + +////////////////////////////////////////////////////////////////////////// + +IoBuffer +LocalResolver::FindChunkByCid(const IoHash& DecompressedId) +{ + RwLock::SharedLockScope _(MapLock); + if (auto It = Attachments.find(DecompressedId); It != Attachments.end()) + { + return It->second; + } + + return {}; +} + +void +LocalResolver::Add(const IoHash& Cid, IoBuffer Data) +{ + RwLock::ExclusiveLockScope _(MapLock); + Data.SetContentType(ZenContentType::kCompressedBinary); + Attachments[Cid] = Data; +} + +/// + +UeRecordingReader::UeRecordingReader(const std::filesystem::path& RecordingPath) : m_RecordingDir(RecordingPath) +{ +} + +UeRecordingReader::~UeRecordingReader() +{ +} + +size_t +UeRecordingReader::GetActionCount() const +{ + return m_WorkDirs.size(); +} + +IoBuffer +UeRecordingReader::FindChunkByCid(const IoHash& DecompressedId) +{ + return m_LocalResolver.FindChunkByCid(DecompressedId); +} + +std::unordered_map<zen::IoHash, zen::CbPackage> +UeRecordingReader::ReadWorkers() +{ + std::unordered_map<zen::IoHash, zen::CbPackage> WorkerMap; + + FileSystemTraversal Traversal; + RecordingTreeVisitor Visitor; + Traversal.TraverseFileSystem(m_RecordingDir, Visitor); + + m_WorkDirs = std::move(Visitor.WorkDirs); + + for (const std::filesystem::path& WorkerDir : Visitor.WorkerDirs) + { + CbObjectFromFile WorkerFile = LoadCompactBinaryObject(WorkerDir / "worker.ucb"); + CbObject WorkerDesc = WorkerFile.Object; + const IoHash& WorkerId = WorkerFile.Hash; + CbPackage& WorkerPkg = WorkerMap[WorkerId]; + WorkerPkg.SetObject(WorkerDesc); + + WorkerDesc.IterateAttachments([&](const zen::CbFieldView AttachmentField) { + const IoHash AttachmentCid = AttachmentField.GetValue().AsHash(); + IoBuffer AttachmentData = ReadFile(WorkerDir / "chunks" / AttachmentCid.ToHexString()).Flatten(); + IoHash RawHash; + uint64_t RawSize = 0; + CompressedBuffer CompressedData = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentData), RawHash, RawSize); + WorkerPkg.AddAttachment(CbAttachment(CompressedData, RawHash)); + }); + } + + return WorkerMap; +} + +void +UeRecordingReader::IterateActions(std::function<void(CbObject ActionObject, const IoHash& ActionId)>&& Callback, int ParallelismTarget) +{ + IterateOverArray( + m_WorkDirs, + [&](const std::filesystem::path& WorkDir) { + CbPackage WorkPackage = ReadAction(WorkDir); + CbObject ActionObject = WorkPackage.GetObject(); + const IoHash& ActionId = WorkPackage.GetObjectHash(); + + Callback(ActionObject, ActionId); + }, + ParallelismTarget); +} + +CbPackage +UeRecordingReader::ReadAction(std::filesystem::path WorkDir) +{ + CbPackage WorkPackage; + std::filesystem::path WorkDescPath = WorkDir / "Build.action"; + CbObjectFromFile ActionFile = LoadCompactBinaryObject(WorkDescPath); + CbObject& ActionObject = ActionFile.Object; + + WorkPackage.SetObject(ActionObject); + + ActionObject.IterateAttachments([&](const zen::CbFieldView AttachmentField) { + const IoHash AttachmentCid = AttachmentField.GetValue().AsHash(); + IoBuffer AttachmentData = ReadFile(WorkDir / "inputs" / AttachmentCid.ToHexString()).Flatten(); + + m_LocalResolver.Add(AttachmentCid, AttachmentData); + + IoHash RawHash; + uint64_t RawSize = 0; + CompressedBuffer CompressedData = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentData), RawHash, RawSize); + ZEN_ASSERT(AttachmentCid == RawHash); + WorkPackage.AddAttachment(CbAttachment(CompressedData, RawHash)); + }); + + return WorkPackage; +} + +} // namespace zen::compute + +#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zencompute/remotehttprunner.cpp b/src/zencompute/remotehttprunner.cpp new file mode 100644 index 000000000..98ced5fe8 --- /dev/null +++ b/src/zencompute/remotehttprunner.cpp @@ -0,0 +1,457 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "remotehttprunner.h" + +#if ZEN_WITH_COMPUTE_SERVICES + +# include <zencore/compactbinary.h> +# include <zencore/compactbinarybuilder.h> +# include <zencore/compactbinarypackage.h> +# include <zencore/compress.h> +# include <zencore/except.h> +# include <zencore/filesystem.h> +# include <zencore/fmtutils.h> +# include <zencore/iobuffer.h> +# include <zencore/iohash.h> +# include <zencore/scopeguard.h> +# include <zenhttp/httpcommon.h> +# include <zenstore/cidstore.h> + +# include <span> + +////////////////////////////////////////////////////////////////////////// + +namespace zen::compute { + +using namespace std::literals; + +////////////////////////////////////////////////////////////////////////// + +RemoteHttpRunner::RemoteHttpRunner(ChunkResolver& InChunkResolver, const std::filesystem::path& BaseDir, std::string_view HostName) +: FunctionRunner(BaseDir) +, m_Log(logging::Get("http_exec")) +, m_ChunkResolver{InChunkResolver} +, m_BaseUrl{fmt::format("{}/apply", HostName)} +, m_Http(m_BaseUrl) +{ + m_MonitorThread = std::thread{&RemoteHttpRunner::MonitorThreadFunction, this}; +} + +RemoteHttpRunner::~RemoteHttpRunner() +{ + Shutdown(); +} + +void +RemoteHttpRunner::Shutdown() +{ + // TODO: should cleanly drain/cancel pending work + + m_MonitorThreadEnabled = false; + m_MonitorThreadEvent.Set(); + if (m_MonitorThread.joinable()) + { + m_MonitorThread.join(); + } +} + +void +RemoteHttpRunner::RegisterWorker(const CbPackage& WorkerPackage) +{ + const IoHash WorkerId = WorkerPackage.GetObjectHash(); + CbPackage WorkerDesc = WorkerPackage; + + std::string WorkerUrl = fmt::format("/workers/{}", WorkerId); + + HttpClient::Response WorkerResponse = m_Http.Get(WorkerUrl); + + if (WorkerResponse.StatusCode == HttpResponseCode::NotFound) + { + HttpClient::Response DescResponse = m_Http.Post(WorkerUrl, WorkerDesc.GetObject()); + + if (DescResponse.StatusCode == HttpResponseCode::NotFound) + { + CbPackage Pkg = WorkerDesc; + + // Build response package by sending only the attachments + // the other end needs. We start with the full package and + // remove the attachments which are not needed. + + { + std::unordered_set<IoHash> Needed; + + CbObject Response = DescResponse.AsObject(); + + for (auto& Item : Response["need"sv]) + { + const IoHash NeedHash = Item.AsHash(); + + Needed.insert(NeedHash); + } + + std::unordered_set<IoHash> ToRemove; + + for (const CbAttachment& Attachment : Pkg.GetAttachments()) + { + const IoHash& Hash = Attachment.GetHash(); + + if (Needed.find(Hash) == Needed.end()) + { + ToRemove.insert(Hash); + } + } + + for (const IoHash& Hash : ToRemove) + { + int RemovedCount = Pkg.RemoveAttachment(Hash); + + ZEN_ASSERT(RemovedCount == 1); + } + } + + // Post resulting package + + HttpClient::Response PayloadResponse = m_Http.Post(WorkerUrl, Pkg); + + if (!IsHttpSuccessCode(PayloadResponse.StatusCode)) + { + ZEN_ERROR("ERROR: unable to register payloads for worker {} at {}{}", WorkerId, m_Http.GetBaseUri(), WorkerUrl); + + // TODO: propagate error + } + } + else if (!IsHttpSuccessCode(DescResponse.StatusCode)) + { + ZEN_ERROR("ERROR: unable to register worker {} at {}{}", WorkerId, m_Http.GetBaseUri(), WorkerUrl); + + // TODO: propagate error + } + else + { + ZEN_ASSERT(DescResponse.StatusCode == HttpResponseCode::NoContent); + } + } + else if (WorkerResponse.StatusCode == HttpResponseCode::OK) + { + // Already known from a previous run + } + else if (!IsHttpSuccessCode(WorkerResponse.StatusCode)) + { + ZEN_ERROR("ERROR: unable to look up worker {} at {}{} (error: {} {})", + WorkerId, + m_Http.GetBaseUri(), + WorkerUrl, + (int)WorkerResponse.StatusCode, + ToString(WorkerResponse.StatusCode)); + + // TODO: propagate error + } +} + +size_t +RemoteHttpRunner::QueryCapacity() +{ + // Estimate how much more work we're ready to accept + + RwLock::SharedLockScope _{m_RunningLock}; + + size_t RunningCount = m_RemoteRunningMap.size(); + + if (RunningCount >= size_t(m_MaxRunningActions)) + { + return 0; + } + + return m_MaxRunningActions - RunningCount; +} + +std::vector<SubmitResult> +RemoteHttpRunner::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) +{ + std::vector<SubmitResult> Results; + + for (const Ref<RunnerAction>& Action : Actions) + { + Results.push_back(SubmitAction(Action)); + } + + return Results; +} + +SubmitResult +RemoteHttpRunner::SubmitAction(Ref<RunnerAction> Action) +{ + // Verify whether we can accept more work + + { + RwLock::SharedLockScope _{m_RunningLock}; + if (m_RemoteRunningMap.size() >= size_t(m_MaxRunningActions)) + { + return SubmitResult{.IsAccepted = false}; + } + } + + using namespace std::literals; + + // Each enqueued action is assigned an integer index (logical sequence number), + // which we use as a key for tracking data structures and as an opaque id which + // may be used by clients to reference the scheduled action + + const int32_t ActionLsn = Action->ActionLsn; + const CbObject& ActionObj = Action->ActionObj; + const IoHash ActionId = ActionObj.GetHash(); + + MaybeDumpAction(ActionLsn, ActionObj); + + // Enqueue job + + CbObject Result; + + HttpClient::Response WorkResponse = m_Http.Post("/jobs", ActionObj); + HttpResponseCode WorkResponseCode = WorkResponse.StatusCode; + + if (WorkResponseCode == HttpResponseCode::OK) + { + Result = WorkResponse.AsObject(); + } + else if (WorkResponseCode == HttpResponseCode::NotFound) + { + // Not all attachments are present + + // Build response package including all required attachments + + CbPackage Pkg; + Pkg.SetObject(ActionObj); + + CbObject Response = WorkResponse.AsObject(); + + for (auto& Item : Response["need"sv]) + { + const IoHash NeedHash = Item.AsHash(); + + if (IoBuffer Chunk = m_ChunkResolver.FindChunkByCid(NeedHash)) + { + uint64_t DataRawSize = 0; + IoHash DataRawHash; + CompressedBuffer Compressed = + CompressedBuffer::FromCompressed(SharedBuffer{Chunk}, /* out */ DataRawHash, /* out */ DataRawSize); + + ZEN_ASSERT(DataRawHash == NeedHash); + + Pkg.AddAttachment(CbAttachment(Compressed, NeedHash)); + } + else + { + // No such attachment + + return {.IsAccepted = false, .Reason = fmt::format("missing attachment {}", NeedHash)}; + } + } + + // Post resulting package + + HttpClient::Response PayloadResponse = m_Http.Post("/jobs", Pkg); + + if (!PayloadResponse) + { + ZEN_WARN("unable to register payloads for action {} at {}/jobs", ActionId, m_Http.GetBaseUri()); + + // TODO: include more information about the failure in the response + + return {.IsAccepted = false, .Reason = "HTTP request failed"}; + } + else if (PayloadResponse.StatusCode == HttpResponseCode::OK) + { + Result = PayloadResponse.AsObject(); + } + else + { + // Unexpected response + + const int ResponseStatusCode = (int)PayloadResponse.StatusCode; + + ZEN_WARN("unable to register payloads for action {} at {}/jobs (error: {} {})", + ActionId, + m_Http.GetBaseUri(), + ResponseStatusCode, + ToString(ResponseStatusCode)); + + return {.IsAccepted = false, + .Reason = fmt::format("unexpected response code {} {} from {}/jobs", + ResponseStatusCode, + ToString(ResponseStatusCode), + m_Http.GetBaseUri())}; + } + } + + if (Result) + { + if (const int32_t LsnField = Result["lsn"].AsInt32(0)) + { + HttpRunningAction NewAction; + NewAction.Action = Action; + NewAction.RemoteActionLsn = LsnField; + + { + RwLock::ExclusiveLockScope _(m_RunningLock); + + m_RemoteRunningMap[LsnField] = std::move(NewAction); + } + + ZEN_DEBUG("scheduled action {} with remote LSN {} (local LSN {})", ActionId, LsnField, ActionLsn); + + Action->SetActionState(RunnerAction::State::Running); + + return SubmitResult{.IsAccepted = true}; + } + } + + return {}; +} + +bool +RemoteHttpRunner::IsHealthy() +{ + if (HttpClient::Response Ready = m_Http.Get("/ready")) + { + return true; + } + else + { + // TODO: use response to propagate context + return false; + } +} + +size_t +RemoteHttpRunner::GetSubmittedActionCount() +{ + RwLock::SharedLockScope _(m_RunningLock); + return m_RemoteRunningMap.size(); +} + +void +RemoteHttpRunner::MonitorThreadFunction() +{ + SetCurrentThreadName("RemoteHttpRunner_Monitor"); + + do + { + const int NormalWaitingTime = 1000; + int WaitTimeMs = NormalWaitingTime; + auto WaitOnce = [&] { return m_MonitorThreadEvent.Wait(WaitTimeMs); }; + auto SweepOnce = [&] { + const size_t RetiredCount = SweepRunningActions(); + + m_RunningLock.WithSharedLock([&] { + if (m_RemoteRunningMap.size() > 16) + { + WaitTimeMs = NormalWaitingTime / 4; + } + else + { + if (RetiredCount) + { + WaitTimeMs = NormalWaitingTime / 2; + } + else + { + WaitTimeMs = NormalWaitingTime; + } + } + }); + }; + + while (!WaitOnce()) + { + SweepOnce(); + } + + // Signal received - this may mean we should quit + + SweepOnce(); + } while (m_MonitorThreadEnabled); +} + +size_t +RemoteHttpRunner::SweepRunningActions() +{ + std::vector<HttpRunningAction> CompletedActions; + + // Poll remote for list of completed actions + + HttpClient::Response ResponseCompleted = m_Http.Get("/jobs/completed"sv); + + if (CbObject Completed = ResponseCompleted.AsObject()) + { + for (auto& FieldIt : Completed["completed"sv]) + { + const int32_t CompleteLsn = FieldIt.AsInt32(); + + if (HttpClient::Response ResponseJob = m_Http.Get(fmt::format("/jobs/{}"sv, CompleteLsn))) + { + m_RunningLock.WithExclusiveLock([&] { + if (auto CompleteIt = m_RemoteRunningMap.find(CompleteLsn); CompleteIt != m_RemoteRunningMap.end()) + { + HttpRunningAction CompletedAction = std::move(CompleteIt->second); + CompletedAction.ActionResults = ResponseJob.AsPackage(); + CompletedAction.Success = true; + + CompletedActions.push_back(std::move(CompletedAction)); + m_RemoteRunningMap.erase(CompleteIt); + } + else + { + // we received a completion notice for an action we don't know about, + // this can happen if the runner is used by multiple upstream schedulers, + // or if this compute node was recently restarted and lost track of + // previously scheduled actions + } + }); + } + } + + if (CbObjectView Metrics = Completed["metrics"sv].AsObjectView()) + { + // if (const size_t CpuCount = Metrics["core_count"].AsInt32(0)) + if (const int32_t CpuCount = Metrics["lp_count"].AsInt32(0)) + { + const int32_t NewCap = zen::Max(4, CpuCount); + + if (m_MaxRunningActions > NewCap) + { + ZEN_DEBUG("capping {} to {} actions (was {})", m_BaseUrl, NewCap, m_MaxRunningActions); + + m_MaxRunningActions = NewCap; + } + } + } + } + + // Notify outer. Note that this has to be done without holding any local locks + // otherwise we may end up with deadlocks. + + for (HttpRunningAction& HttpAction : CompletedActions) + { + const int ActionLsn = HttpAction.Action->ActionLsn; + + if (HttpAction.Success) + { + ZEN_DEBUG("completed: {} LSN {} (remote LSN {})", HttpAction.Action->ActionId, ActionLsn, HttpAction.RemoteActionLsn); + + HttpAction.Action->SetActionState(RunnerAction::State::Completed); + + HttpAction.Action->SetResult(std::move(HttpAction.ActionResults)); + } + else + { + HttpAction.Action->SetActionState(RunnerAction::State::Failed); + } + } + + return CompletedActions.size(); +} + +} // namespace zen::compute + +#endif diff --git a/src/zencompute/remotehttprunner.h b/src/zencompute/remotehttprunner.h new file mode 100644 index 000000000..1e885da3d --- /dev/null +++ b/src/zencompute/remotehttprunner.h @@ -0,0 +1,80 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "zencompute/functionservice.h" + +#if ZEN_WITH_COMPUTE_SERVICES + +# include "functionrunner.h" + +# include <zencore/compactbinarypackage.h> +# include <zencore/logging.h> +# include <zencore/zencore.h> +# include <zenhttp/httpclient.h> + +# include <atomic> +# include <filesystem> +# include <thread> + +namespace zen { +class CidStore; +} + +namespace zen::compute { + +/** HTTP-based runner + + This implements a DDC remote compute execution strategy via REST API + + */ + +class RemoteHttpRunner : public FunctionRunner +{ + RemoteHttpRunner(RemoteHttpRunner&&) = delete; + RemoteHttpRunner& operator=(RemoteHttpRunner&&) = delete; + +public: + RemoteHttpRunner(ChunkResolver& InChunkResolver, const std::filesystem::path& BaseDir, std::string_view HostName); + ~RemoteHttpRunner(); + + virtual void Shutdown() override; + virtual void RegisterWorker(const CbPackage& WorkerPackage) override; + [[nodiscard]] virtual SubmitResult SubmitAction(Ref<RunnerAction> Action) override; + [[nodiscard]] virtual bool IsHealthy() override; + [[nodiscard]] virtual size_t GetSubmittedActionCount() override; + [[nodiscard]] virtual size_t QueryCapacity() override; + [[nodiscard]] virtual std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) override; + +protected: + LoggerRef Log() { return m_Log; } + +private: + LoggerRef m_Log; + ChunkResolver& m_ChunkResolver; + std::string m_BaseUrl; + HttpClient m_Http; + + int32_t m_MaxRunningActions = 256; // arbitrary limit for testing + + struct HttpRunningAction + { + Ref<RunnerAction> Action; + int RemoteActionLsn = 0; // Remote LSN + bool Success = false; + CbPackage ActionResults; + }; + + RwLock m_RunningLock; + std::unordered_map<int, HttpRunningAction> m_RemoteRunningMap; // Note that this is keyed on the *REMOTE* lsn + + std::thread m_MonitorThread; + std::atomic<bool> m_MonitorThreadEnabled{true}; + Event m_MonitorThreadEvent; + void MonitorThreadFunction(); + size_t SweepRunningActions(); +}; + +} // namespace zen::compute + +#endif diff --git a/src/zencompute/xmake.lua b/src/zencompute/xmake.lua new file mode 100644 index 000000000..c710b662d --- /dev/null +++ b/src/zencompute/xmake.lua @@ -0,0 +1,11 @@ +-- Copyright Epic Games, Inc. All Rights Reserved. + +target('zencompute') + set_kind("static") + set_group("libs") + add_headerfiles("**.h") + add_files("**.cpp") + add_includedirs("include", {public=true}) + add_deps("zencore", "zenstore", "zenutil", "zennet", "zenhttp") + add_packages("vcpkg::gsl-lite") + add_packages("vcpkg::spdlog", "vcpkg::cxxopts") diff --git a/src/zencompute/zencompute.cpp b/src/zencompute/zencompute.cpp new file mode 100644 index 000000000..633250f4e --- /dev/null +++ b/src/zencompute/zencompute.cpp @@ -0,0 +1,12 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "zencompute/zencompute.h" + +namespace zen { + +void +zencompute_forcelinktests() +{ +} + +} // namespace zen diff --git a/src/zencore/include/zencore/system.h b/src/zencore/include/zencore/system.h index aec2e0ce4..bf3c15d3d 100644 --- a/src/zencore/include/zencore/system.h +++ b/src/zencore/include/zencore/system.h @@ -25,6 +25,7 @@ struct SystemMetrics uint64_t AvailVirtualMemoryMiB = 0; uint64_t PageFileMiB = 0; uint64_t AvailPageFileMiB = 0; + float CpuUsagePercent = 0.0f; }; SystemMetrics GetSystemMetrics(); diff --git a/src/zencore/system.cpp b/src/zencore/system.cpp index e92691781..267c87e12 100644 --- a/src/zencore/system.cpp +++ b/src/zencore/system.cpp @@ -13,6 +13,8 @@ ZEN_THIRD_PARTY_INCLUDES_START # include <iphlpapi.h> # include <winsock2.h> +# include <pdh.h> +# pragma comment(lib, "pdh.lib") ZEN_THIRD_PARTY_INCLUDES_END #elif ZEN_PLATFORM_LINUX # include <sys/utsname.h> @@ -65,55 +67,98 @@ GetSystemMetrics() // Determine physical core count - DWORD BufferSize = 0; - BOOL Result = GetLogicalProcessorInformationEx(RelationAll, nullptr, &BufferSize); - if (int32_t Error = GetLastError(); Error != ERROR_INSUFFICIENT_BUFFER) { - ThrowSystemError(Error, "Failed to get buffer size for logical processor information"); - } - - PSYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX Buffer = (PSYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX)Memory::Alloc(BufferSize); + DWORD BufferSize = 0; + BOOL Result = GetLogicalProcessorInformationEx(RelationAll, nullptr, &BufferSize); + if (int32_t Error = GetLastError(); Error != ERROR_INSUFFICIENT_BUFFER) + { + ThrowSystemError(Error, "Failed to get buffer size for logical processor information"); + } - Result = GetLogicalProcessorInformationEx(RelationAll, Buffer, &BufferSize); - if (!Result) - { - Memory::Free(Buffer); - throw std::runtime_error("Failed to get logical processor information"); - } + PSYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX Buffer = (PSYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX)Memory::Alloc(BufferSize); - DWORD ProcessorPkgCount = 0; - DWORD ProcessorCoreCount = 0; - DWORD ByteOffset = 0; - while (ByteOffset + sizeof(SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX) <= BufferSize) - { - const SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX& Slpi = Buffer[ByteOffset / sizeof(SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX)]; - if (Slpi.Relationship == RelationProcessorCore) + Result = GetLogicalProcessorInformationEx(RelationAll, Buffer, &BufferSize); + if (!Result) { - ProcessorCoreCount++; + Memory::Free(Buffer); + throw std::runtime_error("Failed to get logical processor information"); } - else if (Slpi.Relationship == RelationProcessorPackage) + + DWORD ProcessorPkgCount = 0; + DWORD ProcessorCoreCount = 0; + DWORD LogicalProcessorCount = 0; + + BYTE* Ptr = reinterpret_cast<BYTE*>(Buffer); + BYTE* const End = Ptr + BufferSize; + while (Ptr < End) { - ProcessorPkgCount++; + const SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX& Slpi = *reinterpret_cast<const SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX*>(Ptr); + if (Slpi.Relationship == RelationProcessorCore) + { + ++ProcessorCoreCount; + + // Count logical processors (threads) across all processor groups for this core. + // Each core entry lists one GROUP_AFFINITY per group it spans; each set bit + // in the Mask represents one logical processor (HyperThreading sibling). + for (WORD g = 0; g < Slpi.Processor.GroupCount; ++g) + { + LogicalProcessorCount += static_cast<DWORD>(__popcnt64(Slpi.Processor.GroupMask[g].Mask)); + } + } + else if (Slpi.Relationship == RelationProcessorPackage) + { + ++ProcessorPkgCount; + } + Ptr += Slpi.Size; } - ByteOffset += sizeof(SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX); - } - Metrics.CoreCount = ProcessorCoreCount; - Metrics.CpuCount = ProcessorPkgCount; + Metrics.CoreCount = ProcessorCoreCount; + Metrics.CpuCount = ProcessorPkgCount; + Metrics.LogicalProcessorCount = LogicalProcessorCount; - Memory::Free(Buffer); + Memory::Free(Buffer); + } // Query memory status - MEMORYSTATUSEX MemStatus{.dwLength = sizeof(MEMORYSTATUSEX)}; - GlobalMemoryStatusEx(&MemStatus); + { + MEMORYSTATUSEX MemStatus{.dwLength = sizeof(MEMORYSTATUSEX)}; + GlobalMemoryStatusEx(&MemStatus); + + Metrics.SystemMemoryMiB = MemStatus.ullTotalPhys / 1024 / 1024; + Metrics.AvailSystemMemoryMiB = MemStatus.ullAvailPhys / 1024 / 1024; + Metrics.VirtualMemoryMiB = MemStatus.ullTotalVirtual / 1024 / 1024; + Metrics.AvailVirtualMemoryMiB = MemStatus.ullAvailVirtual / 1024 / 1024; + Metrics.PageFileMiB = MemStatus.ullTotalPageFile / 1024 / 1024; + Metrics.AvailPageFileMiB = MemStatus.ullAvailPageFile / 1024 / 1024; + } + + // Query CPU usage using PDH + // + // TODO: This should be changed to not require a Sleep, perhaps by using some + // background metrics gathering mechanism. + + { + PDH_HQUERY QueryHandle = nullptr; + PDH_HCOUNTER CounterHandle = nullptr; - Metrics.SystemMemoryMiB = MemStatus.ullTotalPhys / 1024 / 1024; - Metrics.AvailSystemMemoryMiB = MemStatus.ullAvailPhys / 1024 / 1024; - Metrics.VirtualMemoryMiB = MemStatus.ullTotalVirtual / 1024 / 1024; - Metrics.AvailVirtualMemoryMiB = MemStatus.ullAvailVirtual / 1024 / 1024; - Metrics.PageFileMiB = MemStatus.ullTotalPageFile / 1024 / 1024; - Metrics.AvailPageFileMiB = MemStatus.ullAvailPageFile / 1024 / 1024; + if (PdhOpenQueryW(nullptr, 0, &QueryHandle) == ERROR_SUCCESS) + { + if (PdhAddEnglishCounterW(QueryHandle, L"\\Processor(_Total)\\% Processor Time", 0, &CounterHandle) == ERROR_SUCCESS) + { + PdhCollectQueryData(QueryHandle); + Sleep(100); + PdhCollectQueryData(QueryHandle); + + PDH_FMT_COUNTERVALUE CounterValue; + if (PdhGetFormattedCounterValue(CounterHandle, PDH_FMT_DOUBLE, nullptr, &CounterValue) == ERROR_SUCCESS) + { + Metrics.CpuUsagePercent = static_cast<float>(CounterValue.doubleValue); + } + } + PdhCloseQuery(QueryHandle); + } + } return Metrics; } @@ -190,6 +235,39 @@ GetSystemMetrics() } } + // Query CPU usage + Metrics.CpuUsagePercent = 0.0f; + if (FILE* Stat = fopen("/proc/stat", "r")) + { + char Line[256]; + unsigned long User, Nice, System, Idle, IoWait, Irq, SoftIrq; + static unsigned long PrevUser = 0, PrevNice = 0, PrevSystem = 0, PrevIdle = 0, PrevIoWait = 0, PrevIrq = 0, PrevSoftIrq = 0; + + if (fgets(Line, sizeof(Line), Stat)) + { + if (sscanf(Line, "cpu %lu %lu %lu %lu %lu %lu %lu", &User, &Nice, &System, &Idle, &IoWait, &Irq, &SoftIrq) == 7) + { + unsigned long TotalDelta = (User + Nice + System + Idle + IoWait + Irq + SoftIrq) - + (PrevUser + PrevNice + PrevSystem + PrevIdle + PrevIoWait + PrevIrq + PrevSoftIrq); + unsigned long IdleDelta = Idle - PrevIdle; + + if (TotalDelta > 0) + { + Metrics.CpuUsagePercent = 100.0f * (TotalDelta - IdleDelta) / TotalDelta; + } + + PrevUser = User; + PrevNice = Nice; + PrevSystem = System; + PrevIdle = Idle; + PrevIoWait = IoWait; + PrevIrq = Irq; + PrevSoftIrq = SoftIrq; + } + } + fclose(Stat); + } + // Get memory information long Pages = sysconf(_SC_PHYS_PAGES); long PageSize = sysconf(_SC_PAGE_SIZE); @@ -270,6 +348,25 @@ GetSystemMetrics() sysctlbyname("hw.packages", &Packages, &Size, nullptr, 0); Metrics.CpuCount = Packages > 0 ? Packages : 1; + // Query CPU usage using host_statistics64 + Metrics.CpuUsagePercent = 0.0f; + host_cpu_load_info_data_t CpuLoad; + mach_msg_type_number_t CpuCount = sizeof(CpuLoad) / sizeof(natural_t); + if (host_statistics(mach_host_self(), HOST_CPU_LOAD_INFO, (host_info_t)&CpuLoad, &CpuCount) == KERN_SUCCESS) + { + unsigned long TotalTicks = 0; + for (int i = 0; i < CPU_STATE_MAX; ++i) + { + TotalTicks += CpuLoad.cpu_ticks[i]; + } + + if (TotalTicks > 0) + { + unsigned long IdleTicks = CpuLoad.cpu_ticks[CPU_STATE_IDLE]; + Metrics.CpuUsagePercent = 100.0f * (TotalTicks - IdleTicks) / TotalTicks; + } + } + // Get memory information uint64_t MemSize = 0; Size = sizeof(MemSize); diff --git a/src/zenhttp/servers/httpsys.cpp b/src/zenhttp/servers/httpsys.cpp index 14896c803..c640ba90b 100644 --- a/src/zenhttp/servers/httpsys.cpp +++ b/src/zenhttp/servers/httpsys.cpp @@ -331,6 +331,8 @@ public: virtual void WriteResponseAsync(std::function<void(HttpServerRequest&)>&& ContinuationHandler) override; virtual bool TryGetRanges(HttpRanges& Ranges) override; + void LogRequest(HttpMessageResponseRequest* Response); + using HttpServerRequest::WriteResponse; HttpSysServerRequest(const HttpSysServerRequest&) = delete; @@ -429,7 +431,8 @@ public: virtual HttpSysRequestHandler* HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesTransferred) override; void SuppressResponseBody(); // typically used for HEAD requests - inline int64_t GetResponseBodySize() const { return m_TotalDataSize; } + inline uint16_t GetResponseCode() const { return m_ResponseCode; } + inline int64_t GetResponseBodySize() const { return m_TotalDataSize; } private: eastl::fixed_vector<HTTP_DATA_CHUNK, 16> m_HttpDataChunks; @@ -1886,7 +1889,7 @@ HttpSysServerRequest::WriteResponse(HttpResponseCode ResponseCode) ZEN_ASSERT(IsHandled() == false); - auto Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode); + HttpMessageResponseRequest* Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode); if (SuppressBody()) { @@ -1904,6 +1907,7 @@ HttpSysServerRequest::WriteResponse(HttpResponseCode ResponseCode) # endif SetIsHandled(); + LogRequest(Response); } void @@ -1913,7 +1917,7 @@ HttpSysServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentTy ZEN_ASSERT(IsHandled() == false); - auto Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode, ContentType, Blobs); + HttpMessageResponseRequest* Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode, ContentType, Blobs); if (SuppressBody()) { @@ -1931,6 +1935,20 @@ HttpSysServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentTy # endif SetIsHandled(); + LogRequest(Response); +} + +void +HttpSysServerRequest::LogRequest(HttpMessageResponseRequest* Response) +{ + if (ShouldLogRequest()) + { + ZEN_INFO("{} {} {} -> {}", + ToString(RequestVerb()), + m_UriUtf8.c_str(), + Response->GetResponseCode(), + NiceBytes(Response->GetResponseBodySize())); + } } void @@ -1959,6 +1977,7 @@ HttpSysServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentTy # endif SetIsHandled(); + LogRequest(Response); } void diff --git a/src/zennet/beacon.cpp b/src/zennet/beacon.cpp new file mode 100644 index 000000000..394a4afbb --- /dev/null +++ b/src/zennet/beacon.cpp @@ -0,0 +1,170 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zennet/beacon.h> + +#include <zencore/basicfile.h> +#include <zencore/compactbinary.h> +#include <zencore/compactbinaryfile.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/session.h> +#include <zencore/uid.h> + +#include <fmt/format.h> +#include <asio.hpp> +#include <map> + +namespace zen { + +////////////////////////////////////////////////////////////////////////// + +struct FsBeacon::Impl +{ + Impl(std::filesystem::path ShareRoot); + ~Impl(); + + void EnsureValid(); + + void AddGroup(std::string_view GroupId, CbObject Metadata); + void ScanGroup(std::string_view GroupId, std::vector<Oid>& OutSessions); + void ReadMetadata(std::string_view GroupId, const std::vector<Oid>& InSessions, std::vector<CbObject>& OutMetadata); + +private: + std::filesystem::path m_ShareRoot; + zen::Oid m_SessionId; + + struct GroupData + { + CbObject Metadata; + BasicFile LockFile; + }; + + std::map<std::string, GroupData> m_Registration; + + std::filesystem::path GetSessionMarkerPath(std::string_view GroupId, const Oid& SessionId) + { + Oid::String_t SessionIdString; + SessionId.ToString(SessionIdString); + + return m_ShareRoot / GroupId / SessionIdString; + } +}; + +FsBeacon::Impl::Impl(std::filesystem::path ShareRoot) : m_ShareRoot(ShareRoot), m_SessionId(GetSessionId()) +{ +} + +FsBeacon::Impl::~Impl() +{ +} + +void +FsBeacon::Impl::EnsureValid() +{ +} + +void +FsBeacon::Impl::AddGroup(std::string_view GroupId, CbObject Metadata) +{ + zen::CreateDirectories(m_ShareRoot / GroupId); + std::filesystem::path MarkerFile = GetSessionMarkerPath(GroupId, m_SessionId); + + GroupData& Group = m_Registration[std::string(GroupId)]; + + Group.Metadata = Metadata; + + std::error_code Ec; + Group.LockFile.Open(MarkerFile, + BasicFile::Mode::kTruncate | BasicFile::Mode::kPreventDelete | + BasicFile::Mode::kPreventWrite /* | BasicFile::Mode::kDeleteOnClose */, + Ec); + + if (Ec) + { + throw std::system_error(Ec, fmt::format("failed to open beacon marker file '{}' for write", MarkerFile)); + } + + Group.LockFile.WriteAll(Metadata.GetBuffer().AsIoBuffer(), Ec); + + if (Ec) + { + throw std::system_error(Ec, fmt::format("failed to write to beacon marker file '{}'", MarkerFile)); + } + + Group.LockFile.Flush(); +} + +void +FsBeacon::Impl::ScanGroup(std::string_view GroupId, std::vector<Oid>& OutSessions) +{ + DirectoryContent Dc; + zen::GetDirectoryContent(m_ShareRoot / GroupId, zen::DirectoryContentFlags::IncludeFiles, /* out */ Dc); + + for (const std::filesystem::path& FilePath : Dc.Files) + { + std::filesystem::path File = FilePath.filename(); + + std::error_code Ec; + if (std::filesystem::remove(FilePath, Ec) == false) + { + auto FileString = File.generic_string(); + + if (FileString.length() != Oid::StringLength) + continue; + + if (const Oid SessionId = Oid::FromHexString(FileString)) + { + if (std::filesystem::file_size(File, Ec) > 0) + { + OutSessions.push_back(SessionId); + } + } + } + } +} + +void +FsBeacon::Impl::ReadMetadata(std::string_view GroupId, const std::vector<Oid>& InSessions, std::vector<CbObject>& OutMetadata) +{ + for (const Oid& SessionId : InSessions) + { + const std::filesystem::path MarkerFile = GetSessionMarkerPath(GroupId, SessionId); + + if (CbObject Metadata = LoadCompactBinaryObject(MarkerFile).Object) + { + OutMetadata.push_back(std::move(Metadata)); + } + } +} + +////////////////////////////////////////////////////////////////////////// + +FsBeacon::FsBeacon(std::filesystem::path ShareRoot) : m_Impl(std::make_unique<Impl>(ShareRoot)) +{ +} + +FsBeacon::~FsBeacon() +{ +} + +void +FsBeacon::AddGroup(std::string_view GroupId, CbObject Metadata) +{ + m_Impl->AddGroup(GroupId, Metadata); +} + +void +FsBeacon::ScanGroup(std::string_view GroupId, std::vector<Oid>& OutSessions) +{ + m_Impl->ScanGroup(GroupId, OutSessions); +} + +void +FsBeacon::ReadMetadata(std::string_view GroupId, const std::vector<Oid>& InSessions, std::vector<CbObject>& OutMetadata) +{ + m_Impl->ReadMetadata(GroupId, InSessions, OutMetadata); +} + +////////////////////////////////////////////////////////////////////////// + +} // namespace zen diff --git a/src/zennet/include/zennet/beacon.h b/src/zennet/include/zennet/beacon.h new file mode 100644 index 000000000..a8d4805cb --- /dev/null +++ b/src/zennet/include/zennet/beacon.h @@ -0,0 +1,38 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zennet/zennet.h> + +#include <zencore/uid.h> + +#include <filesystem> +#include <memory> +#include <string> +#include <vector> + +namespace zen { + +class CbObject; + +/** File-system based peer discovery + + Intended to be used with an SMB file share as the root. + */ + +class FsBeacon +{ +public: + FsBeacon(std::filesystem::path ShareRoot); + ~FsBeacon(); + + void AddGroup(std::string_view GroupId, CbObject Metadata); + void ScanGroup(std::string_view GroupId, std::vector<Oid>& OutSessions); + void ReadMetadata(std::string_view GroupId, const std::vector<Oid>& InSessions, std::vector<CbObject>& OutMetadata); + +private: + struct Impl; + std::unique_ptr<Impl> m_Impl; +}; + +} // namespace zen diff --git a/src/zennet/include/zennet/statsdclient.h b/src/zennet/include/zennet/statsdclient.h index c378e49ce..7688c132c 100644 --- a/src/zennet/include/zennet/statsdclient.h +++ b/src/zennet/include/zennet/statsdclient.h @@ -8,6 +8,8 @@ #include <memory> #include <string_view> +#undef SendMessage + namespace zen { class StatsTransportBase diff --git a/src/zennet/statsdclient.cpp b/src/zennet/statsdclient.cpp index fe5ca4dda..a0e8cb6ce 100644 --- a/src/zennet/statsdclient.cpp +++ b/src/zennet/statsdclient.cpp @@ -12,6 +12,7 @@ ZEN_THIRD_PARTY_INCLUDES_START #include <zencore/windows.h> #include <asio.hpp> +#undef SendMessage ZEN_THIRD_PARTY_INCLUDES_END namespace zen { diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp index 2319ad66d..ade431393 100644 --- a/src/zenremotestore/builds/buildstorageoperations.cpp +++ b/src/zenremotestore/builds/buildstorageoperations.cpp @@ -4083,7 +4083,8 @@ BuildsOperationUpdateFolder::WriteSequenceChunkToCache(BufferedWriteFileCache::L } bool -BuildsOperationUpdateFolder::GetBlockWriteOps(std::span<const IoHash> ChunkRawHashes, +BuildsOperationUpdateFolder::GetBlockWriteOps(const IoHash& BlockRawHash, + std::span<const IoHash> ChunkRawHashes, std::span<const uint32_t> ChunkCompressedLengths, std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, @@ -4115,9 +4116,34 @@ BuildsOperationUpdateFolder::GetBlockWriteOps(std::span<const IoHash> ChunkR uint64_t VerifyChunkSize; CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed(SharedBuffer::MakeView(ChunkMemoryView), VerifyChunkHash, VerifyChunkSize); - ZEN_ASSERT(CompressedChunk); - ZEN_ASSERT(VerifyChunkHash == ChunkHash); - ZEN_ASSERT(VerifyChunkSize == m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]); + if (!CompressedChunk) + { + throw std::runtime_error(fmt::format("Chunk {} at {}, size {} in block {} is not a valid compressed buffer", + ChunkHash, + OffsetInBlock, + ChunkCompressedSize, + BlockRawHash)); + } + if (VerifyChunkHash != ChunkHash) + { + throw std::runtime_error(fmt::format("Chunk {} at {}, size {} in block {} has a mismatching content hash {}", + ChunkHash, + OffsetInBlock, + ChunkCompressedSize, + BlockRawHash, + VerifyChunkHash)); + } + if (VerifyChunkSize != m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]) + { + throw std::runtime_error( + fmt::format("Chunk {} at {}, size {} in block {} has a mismatching raw size {}, expected {}", + ChunkHash, + OffsetInBlock, + ChunkCompressedSize, + BlockRawHash, + VerifyChunkSize, + m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex])); + } OodleCompressor ChunkCompressor; OodleCompressionLevel ChunkCompressionLevel; @@ -4138,7 +4164,18 @@ BuildsOperationUpdateFolder::GetBlockWriteOps(std::span<const IoHash> ChunkR { Decompressed = CompressedChunk.Decompress().AsIoBuffer(); } - ZEN_ASSERT(Decompressed.GetSize() == m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]); + + if (Decompressed.GetSize() != m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]) + { + throw std::runtime_error(fmt::format("Chunk {} at {}, size {} in block {} decompressed to size {}, expected {}", + ChunkHash, + OffsetInBlock, + ChunkCompressedSize, + BlockRawHash, + Decompressed.GetSize(), + m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex])); + } + ZEN_ASSERT_SLOW(ChunkHash == IoHash::HashBuffer(Decompressed)); for (const ChunkedContentLookup::ChunkSequenceLocation* Target : ChunkTargetPtrs) { @@ -4237,7 +4274,8 @@ BuildsOperationUpdateFolder::WriteChunksBlockToCache(const ChunkBlockDescription const std::vector<uint32_t> ChunkCompressedLengths = ReadChunkBlockHeader(BlockView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder()), HeaderSize); - if (GetBlockWriteOps(BlockDescription.ChunkRawHashes, + if (GetBlockWriteOps(BlockDescription.BlockHash, + BlockDescription.ChunkRawHashes, ChunkCompressedLengths, SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndexNeedsCopyFromSourceFlags, @@ -4252,7 +4290,8 @@ BuildsOperationUpdateFolder::WriteChunksBlockToCache(const ChunkBlockDescription return false; } - if (GetBlockWriteOps(BlockDescription.ChunkRawHashes, + if (GetBlockWriteOps(BlockDescription.BlockHash, + BlockDescription.ChunkRawHashes, BlockDescription.ChunkCompressedLengths, SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndexNeedsCopyFromSourceFlags, @@ -4283,7 +4322,8 @@ BuildsOperationUpdateFolder::WritePartialBlockChunksToCache(const ChunkBlockDesc const MemoryView BlockView = BlockMemoryBuffer.GetView(); BlockWriteOps Ops; - if (GetBlockWriteOps(BlockDescription.ChunkRawHashes, + if (GetBlockWriteOps(BlockDescription.BlockHash, + BlockDescription.ChunkRawHashes, BlockDescription.ChunkCompressedLengths, SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndexNeedsCopyFromSourceFlags, @@ -5334,6 +5374,13 @@ BuildsOperationUploadFolder::FetchChunk(const ChunkedFolderContent& Content, ZEN_ASSERT(!ChunkLocations.empty()); CompositeBuffer Chunk = OpenFileCache.GetRange(ChunkLocations[0].SequenceIndex, ChunkLocations[0].Offset, Content.ChunkedContent.ChunkRawSizes[ChunkIndex]); + if (!Chunk) + { + throw std::runtime_error(fmt::format("Unable to read chunk at {}, size {} from '{}'", + ChunkLocations[0].Offset, + Content.ChunkedContent.ChunkRawSizes[ChunkIndex], + Content.Paths[Lookup.SequenceIndexFirstPathIndex[ChunkLocations[0].SequenceIndex]])); + } ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk) == ChunkHash); return Chunk; }; @@ -5362,10 +5409,7 @@ BuildsOperationUploadFolder::GenerateBlock(const ChunkedFolderContent& Content, Content.ChunkedContent.ChunkHashes[ChunkIndex], [this, &Content, &Lookup, &OpenFileCache, ChunkIndex](const IoHash& ChunkHash) -> std::pair<uint64_t, CompressedBuffer> { CompositeBuffer Chunk = FetchChunk(Content, Lookup, ChunkHash, OpenFileCache); - if (!Chunk) - { - ZEN_ASSERT(false); - } + ZEN_ASSERT(Chunk); uint64_t RawSize = Chunk.GetSize(); const bool ShouldCompressChunk = RawSize >= m_Options.MinimumSizeForCompressInBlock && diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h index 6304159ae..9e5bf8d91 100644 --- a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h +++ b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h @@ -339,7 +339,8 @@ private: const uint64_t FileOffset, const uint32_t PathIndex); - bool GetBlockWriteOps(std::span<const IoHash> ChunkRawHashes, + bool GetBlockWriteOps(const IoHash& BlockRawHash, + std::span<const IoHash> ChunkRawHashes, std::span<const uint32_t> ChunkCompressedLengths, std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, diff --git a/src/zenserver-test/function-tests.cpp b/src/zenserver-test/function-tests.cpp new file mode 100644 index 000000000..559387fa2 --- /dev/null +++ b/src/zenserver-test/function-tests.cpp @@ -0,0 +1,34 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zencore/zencore.h> + +#if ZEN_WITH_TESTS + +# include <zencore/compactbinary.h> +# include <zencore/compactbinarybuilder.h> +# include <zencore/string.h> +# include <zencore/testing.h> +# include <zenutil/zenserverprocess.h> + +# include "zenserver-test.h" + +namespace zen::tests { + +using namespace std::literals; + +TEST_CASE("function.run") +{ + std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); + + ZenServerInstance Instance(TestEnv); + Instance.SetDataDir(TestDir); + Instance.SpawnServer(13337); + + ZEN_INFO("Waiting..."); + + Instance.WaitUntilReady(); +} + +} // namespace zen::tests + +#endif diff --git a/src/zenserver/compute/computeserver.cpp b/src/zenserver/compute/computeserver.cpp new file mode 100644 index 000000000..173f56386 --- /dev/null +++ b/src/zenserver/compute/computeserver.cpp @@ -0,0 +1,330 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "computeserver.h" +#include <zencompute/httpfunctionservice.h> +#include "computeservice.h" + +#if ZEN_WITH_COMPUTE_SERVICES + +# include <zencore/fmtutils.h> +# include <zencore/memory/llm.h> +# include <zencore/memory/memorytrace.h> +# include <zencore/memory/tagtrace.h> +# include <zencore/scopeguard.h> +# include <zencore/sentryintegration.h> +# include <zencore/system.h> +# include <zencore/windows.h> +# include <zenhttp/httpapiservice.h> +# include <zenstore/cidstore.h> +# include <zenutil/service.h> + +ZEN_THIRD_PARTY_INCLUDES_START +# include <cxxopts.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +void +ZenComputeServerConfigurator::AddCliOptions(cxxopts::Options& Options) +{ + Options.add_option("compute", + "", + "upstream-notification-endpoint", + "Endpoint URL for upstream notifications", + cxxopts::value<std::string>(m_ServerOptions.UpstreamNotificationEndpoint)->default_value(""), + ""); + + Options.add_option("compute", + "", + "instance-id", + "Instance ID for use in notifications", + cxxopts::value<std::string>(m_ServerOptions.InstanceId)->default_value(""), + ""); +} + +void +ZenComputeServerConfigurator::AddConfigOptions(LuaConfig::Options& Options) +{ + ZEN_UNUSED(Options); +} + +void +ZenComputeServerConfigurator::ApplyOptions(cxxopts::Options& Options) +{ + ZEN_UNUSED(Options); +} + +void +ZenComputeServerConfigurator::OnConfigFileParsed(LuaConfig::Options& LuaOptions) +{ + ZEN_UNUSED(LuaOptions); +} + +void +ZenComputeServerConfigurator::ValidateOptions() +{ +} + +/////////////////////////////////////////////////////////////////////////// + +ZenComputeServer::ZenComputeServer() +{ +} + +ZenComputeServer::~ZenComputeServer() +{ + Cleanup(); +} + +int +ZenComputeServer::Initialize(const ZenComputeServerConfig& ServerConfig, ZenServerState::ZenServerEntry* ServerEntry) +{ + ZEN_TRACE_CPU("ZenComputeServer::Initialize"); + ZEN_MEMSCOPE(GetZenserverTag()); + + ZEN_INFO(ZEN_APP_NAME " initializing in HUB server mode"); + + const int EffectiveBasePort = ZenServerBase::Initialize(ServerConfig, ServerEntry); + if (EffectiveBasePort < 0) + { + return EffectiveBasePort; + } + + // This is a workaround to make sure we can have automated tests. Without + // this the ranges for different child zen hub processes could overlap with + // the main test range. + ZenServerEnvironment::SetBaseChildId(1000); + + m_DebugOptionForcedCrash = ServerConfig.ShouldCrash; + + InitializeState(ServerConfig); + InitializeServices(ServerConfig); + RegisterServices(ServerConfig); + + ZenServerBase::Finalize(); + + return EffectiveBasePort; +} + +void +ZenComputeServer::Cleanup() +{ + ZEN_TRACE_CPU("ZenStorageServer::Cleanup"); + ZEN_INFO(ZEN_APP_NAME " cleaning up"); + try + { + m_IoContext.stop(); + if (m_IoRunner.joinable()) + { + m_IoRunner.join(); + } + + if (m_Http) + { + m_Http->Close(); + } + } + catch (const std::exception& Ex) + { + ZEN_ERROR("exception thrown during Cleanup() in {}: '{}'", ZEN_APP_NAME, Ex.what()); + } +} + +void +ZenComputeServer::InitializeState(const ZenComputeServerConfig& ServerConfig) +{ + ZEN_UNUSED(ServerConfig); +} + +void +ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig) +{ + ZEN_INFO("initializing storage"); + + CidStoreConfiguration Config; + Config.RootDirectory = m_DataRoot / "cas"; + + m_CidStore = std::make_unique<CidStore>(m_GcManager); + m_CidStore->Initialize(Config); + + ZEN_INFO("instantiating API service"); + m_ApiService = std::make_unique<zen::HttpApiService>(*m_Http); + + ZEN_INFO("instantiating compute service"); + m_ComputeService = std::make_unique<HttpComputeService>(ServerConfig.DataDir / "compute"); + + // Ref<zen::compute::FunctionRunner> Runner; + // Runner = zen::compute::CreateLocalRunner(*m_CidStore, ServerConfig.DataDir / "runner"); + + // TODO: (re)implement default configuration here + + ZEN_INFO("instantiating function service"); + m_FunctionService = + std::make_unique<zen::compute::HttpFunctionService>(*m_CidStore, m_StatsService, ServerConfig.DataDir / "functions"); +} + +void +ZenComputeServer::RegisterServices(const ZenComputeServerConfig& ServerConfig) +{ + ZEN_UNUSED(ServerConfig); + + if (m_ComputeService) + { + m_Http->RegisterService(*m_ComputeService); + } + + if (m_ApiService) + { + m_Http->RegisterService(*m_ApiService); + } + + if (m_FunctionService) + { + m_Http->RegisterService(*m_FunctionService); + } +} + +void +ZenComputeServer::Run() +{ + if (m_ProcessMonitor.IsActive()) + { + CheckOwnerPid(); + } + + if (!m_TestMode) + { + // clang-format off + ZEN_INFO( R"(__________ _________ __ )" "\n" + R"(\____ /____ ____ \_ ___ \ ____ _____ ______ __ ___/ |_ ____ )" "\n" + R"( / // __ \ / \/ \ \/ / _ \ / \\____ \| | \ __\/ __ \ )" "\n" + R"( / /\ ___/| | \ \___( <_> ) Y Y \ |_> > | /| | \ ___/ )" "\n" + R"(/_______ \___ >___| /\______ /\____/|__|_| / __/|____/ |__| \___ >)" "\n" + R"( \/ \/ \/ \/ \/|__| \/ )"); + // clang-format on + + ExtendableStringBuilder<256> BuildOptions; + GetBuildOptions(BuildOptions, '\n'); + ZEN_INFO("Build options ({}/{}):\n{}", GetOperatingSystemName(), GetCpuName(), BuildOptions); + } + + ZEN_INFO(ZEN_APP_NAME " now running as COMPUTE (pid: {})", GetCurrentProcessId()); + +# if ZEN_PLATFORM_WINDOWS + if (zen::windows::IsRunningOnWine()) + { + ZEN_INFO("detected Wine session - " ZEN_APP_NAME " is not formally tested on Wine and may therefore not work or perform well"); + } +# endif + +# if ZEN_USE_SENTRY + ZEN_INFO("sentry crash handler {}", m_UseSentry ? "ENABLED" : "DISABLED"); + if (m_UseSentry) + { + SentryIntegration::ClearCaches(); + } +# endif + + if (m_DebugOptionForcedCrash) + { + ZEN_DEBUG_BREAK(); + } + + const bool IsInteractiveMode = IsInteractiveSession(); // &&!m_TestMode; + + SetNewState(kRunning); + + OnReady(); + + m_Http->Run(IsInteractiveMode); + + SetNewState(kShuttingDown); + + ZEN_INFO(ZEN_APP_NAME " exiting"); +} + +////////////////////////////////////////////////////////////////////////////////// + +ZenComputeServerMain::ZenComputeServerMain(ZenComputeServerConfig& ServerOptions) +: ZenServerMain(ServerOptions) +, m_ServerOptions(ServerOptions) +{ +} + +void +ZenComputeServerMain::DoRun(ZenServerState::ZenServerEntry* Entry) +{ + ZenComputeServer Server; + Server.SetDataRoot(m_ServerOptions.DataDir); + Server.SetContentRoot(m_ServerOptions.ContentDir); + Server.SetTestMode(m_ServerOptions.IsTest); + Server.SetDedicatedMode(m_ServerOptions.IsDedicated); + + const int EffectiveBasePort = Server.Initialize(m_ServerOptions, Entry); + if (EffectiveBasePort == -1) + { + // Server.Initialize has already logged what the issue is - just exit with failure code here. + std::exit(1); + } + + Entry->EffectiveListenPort = uint16_t(EffectiveBasePort); + if (EffectiveBasePort != m_ServerOptions.BasePort) + { + ZEN_INFO(ZEN_APP_NAME " - relocated to base port {}", EffectiveBasePort); + m_ServerOptions.BasePort = EffectiveBasePort; + } + + std::unique_ptr<std::thread> ShutdownThread; + std::unique_ptr<NamedEvent> ShutdownEvent; + + ExtendableStringBuilder<64> ShutdownEventName; + ShutdownEventName << "Zen_" << m_ServerOptions.BasePort << "_Shutdown"; + ShutdownEvent.reset(new NamedEvent{ShutdownEventName}); + + // Monitor shutdown signals + + ShutdownThread.reset(new std::thread{[&] { + SetCurrentThreadName("shutdown_mon"); + + ZEN_INFO("shutdown monitor thread waiting for shutdown signal '{}' for process {}", ShutdownEventName, zen::GetCurrentProcessId()); + + if (ShutdownEvent->Wait()) + { + ZEN_INFO("shutdown signal for pid {} received", zen::GetCurrentProcessId()); + Server.RequestExit(0); + } + else + { + ZEN_INFO("shutdown signal wait() failed"); + } + }}); + + auto CleanupShutdown = MakeGuard([&ShutdownEvent, &ShutdownThread] { + ReportServiceStatus(ServiceStatus::Stopping); + + if (ShutdownEvent) + { + ShutdownEvent->Set(); + } + if (ShutdownThread && ShutdownThread->joinable()) + { + ShutdownThread->join(); + } + }); + + // If we have a parent process, establish the mechanisms we need + // to be able to communicate readiness with the parent + + Server.SetIsReadyFunc([&] { + std::error_code Ec; + m_LockFile.Update(MakeLockData(true), Ec); + ReportServiceStatus(ServiceStatus::Running); + NotifyReady(); + }); + + Server.Run(); +} + +} // namespace zen + +#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zenserver/compute/computeserver.h b/src/zenserver/compute/computeserver.h new file mode 100644 index 000000000..625140b23 --- /dev/null +++ b/src/zenserver/compute/computeserver.h @@ -0,0 +1,106 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "zenserver.h" + +#if ZEN_WITH_COMPUTE_SERVICES + +# include <zenstore/gc.h> + +namespace cxxopts { +class Options; +} +namespace zen::LuaConfig { +struct Options; +} + +namespace zen::compute { +class HttpFunctionService; +} + +namespace zen { + +class CidStore; +class HttpApiService; +class HttpComputeService; + +struct ZenComputeServerConfig : public ZenServerConfig +{ + std::string UpstreamNotificationEndpoint; + std::string InstanceId; // For use in notifications +}; + +struct ZenComputeServerConfigurator : public ZenServerConfiguratorBase +{ + ZenComputeServerConfigurator(ZenComputeServerConfig& ServerOptions) + : ZenServerConfiguratorBase(ServerOptions) + , m_ServerOptions(ServerOptions) + { + } + + ~ZenComputeServerConfigurator() = default; + +private: + virtual void AddCliOptions(cxxopts::Options& Options) override; + virtual void AddConfigOptions(LuaConfig::Options& Options) override; + virtual void ApplyOptions(cxxopts::Options& Options) override; + virtual void OnConfigFileParsed(LuaConfig::Options& LuaOptions) override; + virtual void ValidateOptions() override; + + ZenComputeServerConfig& m_ServerOptions; +}; + +class ZenComputeServerMain : public ZenServerMain +{ +public: + ZenComputeServerMain(ZenComputeServerConfig& ServerOptions); + virtual void DoRun(ZenServerState::ZenServerEntry* Entry) override; + + ZenComputeServerMain(const ZenComputeServerMain&) = delete; + ZenComputeServerMain& operator=(const ZenComputeServerMain&) = delete; + + typedef ZenComputeServerConfig Config; + typedef ZenComputeServerConfigurator Configurator; + +private: + ZenComputeServerConfig& m_ServerOptions; +}; + +/** + * The compute server handles DDC build function execution requests + * only. It's intended to be used on a pure compute resource and does + * not handle any storage tasks. The actual scheduling happens upstream + * in a storage server instance. + */ + +class ZenComputeServer : public ZenServerBase +{ + ZenComputeServer& operator=(ZenComputeServer&&) = delete; + ZenComputeServer(ZenComputeServer&&) = delete; + +public: + ZenComputeServer(); + ~ZenComputeServer(); + + int Initialize(const ZenComputeServerConfig& ServerConfig, ZenServerState::ZenServerEntry* ServerEntry); + void Run(); + void Cleanup(); + +private: + HttpStatsService m_StatsService; + GcManager m_GcManager; + GcScheduler m_GcScheduler{m_GcManager}; + std::unique_ptr<CidStore> m_CidStore; + std::unique_ptr<HttpComputeService> m_ComputeService; + std::unique_ptr<HttpApiService> m_ApiService; + std::unique_ptr<zen::compute::HttpFunctionService> m_FunctionService; + + void InitializeState(const ZenComputeServerConfig& ServerConfig); + void InitializeServices(const ZenComputeServerConfig& ServerConfig); + void RegisterServices(const ZenComputeServerConfig& ServerConfig); +}; + +} // namespace zen + +#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zenserver/compute/computeservice.cpp b/src/zenserver/compute/computeservice.cpp new file mode 100644 index 000000000..2c0bc0ae9 --- /dev/null +++ b/src/zenserver/compute/computeservice.cpp @@ -0,0 +1,100 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "computeservice.h" + +#if ZEN_WITH_COMPUTE_SERVICES + +# include <zencore/compactbinarybuilder.h> +# include <zencore/filesystem.h> +# include <zencore/fmtutils.h> +# include <zencore/logging.h> +# include <zencore/system.h> +# include <zenutil/zenserverprocess.h> + +ZEN_THIRD_PARTY_INCLUDES_START +# include <EASTL/fixed_vector.h> +# include <asio.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +# include <unordered_map> + +namespace zen { + +////////////////////////////////////////////////////////////////////////// + +struct ResourceMetrics +{ + uint64_t DiskUsageBytes = 0; + uint64_t MemoryUsageBytes = 0; +}; + +////////////////////////////////////////////////////////////////////////// + +struct HttpComputeService::Impl +{ + Impl(const Impl&) = delete; + Impl& operator=(const Impl&) = delete; + + Impl(); + ~Impl(); + + void Initialize(std::filesystem::path BaseDir) { ZEN_UNUSED(BaseDir); } + + void Cleanup() {} + +private: +}; + +HttpComputeService::Impl::Impl() +{ +} + +HttpComputeService::Impl::~Impl() +{ +} + +/////////////////////////////////////////////////////////////////////////// + +HttpComputeService::HttpComputeService(std::filesystem::path BaseDir) : m_Impl(std::make_unique<Impl>()) +{ + using namespace std::literals; + + m_Impl->Initialize(BaseDir); + + m_Router.RegisterRoute( + "status", + [this](HttpRouterRequest& Req) { + CbObjectWriter Obj; + Obj.BeginArray("modules"); + Obj.EndArray(); + Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "stats", + [this](HttpRouterRequest& Req) { + CbObjectWriter Obj; + Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); + }, + HttpVerb::kGet); +} + +HttpComputeService::~HttpComputeService() +{ +} + +const char* +HttpComputeService::BaseUri() const +{ + return "/compute/"; +} + +void +HttpComputeService::HandleRequest(zen::HttpServerRequest& Request) +{ + m_Router.HandleRequest(Request); +} + +} // namespace zen +#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zenserver/compute/computeservice.h b/src/zenserver/compute/computeservice.h new file mode 100644 index 000000000..339200dd8 --- /dev/null +++ b/src/zenserver/compute/computeservice.h @@ -0,0 +1,36 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zenhttp/httpserver.h> + +#if ZEN_WITH_COMPUTE_SERVICES +namespace zen { + +/** ZenServer Compute Service + * + * Manages a set of compute workers for use in UEFN content worker + * + */ +class HttpComputeService : public zen::HttpService +{ +public: + HttpComputeService(std::filesystem::path BaseDir); + ~HttpComputeService(); + + HttpComputeService(const HttpComputeService&) = delete; + HttpComputeService& operator=(const HttpComputeService&) = delete; + + virtual const char* BaseUri() const override; + virtual void HandleRequest(zen::HttpServerRequest& Request) override; + +private: + HttpRequestRouter m_Router; + + struct Impl; + + std::unique_ptr<Impl> m_Impl; +}; + +} // namespace zen +#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zenserver/frontend/html/compute.html b/src/zenserver/frontend/html/compute.html new file mode 100644 index 000000000..668189fe5 --- /dev/null +++ b/src/zenserver/frontend/html/compute.html @@ -0,0 +1,991 @@ +<!DOCTYPE html> +<html lang="en"> +<head> + <meta charset="UTF-8"> + <meta name="viewport" content="width=device-width, initial-scale=1.0"> + <title>Zen Compute Dashboard</title> + <script src="https://cdn.jsdelivr.net/npm/[email protected]/dist/chart.umd.min.js"></script> + <style> + * { + margin: 0; + padding: 0; + box-sizing: border-box; + } + + body { + font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Oxygen, Ubuntu, Cantarell, sans-serif; + background: #0d1117; + color: #c9d1d9; + padding: 20px; + } + + .container { + max-width: 1400px; + margin: 0 auto; + } + + h1 { + font-size: 32px; + font-weight: 600; + margin-bottom: 10px; + color: #f0f6fc; + } + + .header { + display: flex; + justify-content: space-between; + align-items: center; + margin-bottom: 30px; + } + + .health-indicator { + display: flex; + align-items: center; + gap: 8px; + font-size: 14px; + padding: 8px 16px; + border-radius: 6px; + background: #161b22; + border: 1px solid #30363d; + } + + .health-indicator .status-dot { + width: 10px; + height: 10px; + border-radius: 50%; + background: #6e7681; + } + + .health-indicator.healthy .status-dot { + background: #3fb950; + } + + .health-indicator.unhealthy .status-dot { + background: #f85149; + } + + .grid { + display: grid; + grid-template-columns: repeat(auto-fit, minmax(280px, 1fr)); + gap: 20px; + margin-bottom: 30px; + } + + .card { + background: #161b22; + border: 1px solid #30363d; + border-radius: 6px; + padding: 20px; + } + + .card-title { + font-size: 14px; + font-weight: 600; + color: #8b949e; + margin-bottom: 12px; + text-transform: uppercase; + letter-spacing: 0.5px; + } + + .metric-value { + font-size: 36px; + font-weight: 600; + color: #f0f6fc; + line-height: 1; + } + + .metric-label { + font-size: 12px; + color: #8b949e; + margin-top: 4px; + } + + .chart-container { + position: relative; + height: 300px; + margin-top: 20px; + } + + .stats-row { + display: flex; + justify-content: space-between; + margin-bottom: 12px; + padding: 8px 0; + border-bottom: 1px solid #21262d; + } + + .stats-row:last-child { + border-bottom: none; + margin-bottom: 0; + } + + .stats-label { + color: #8b949e; + font-size: 13px; + } + + .stats-value { + color: #f0f6fc; + font-weight: 600; + font-size: 13px; + } + + .rate-stats { + display: grid; + grid-template-columns: repeat(3, 1fr); + gap: 16px; + margin-top: 16px; + } + + .rate-item { + text-align: center; + } + + .rate-value { + font-size: 20px; + font-weight: 600; + color: #58a6ff; + } + + .rate-label { + font-size: 11px; + color: #8b949e; + margin-top: 4px; + text-transform: uppercase; + } + + .progress-bar { + width: 100%; + height: 8px; + background: #21262d; + border-radius: 4px; + overflow: hidden; + margin-top: 8px; + } + + .progress-fill { + height: 100%; + background: #58a6ff; + transition: width 0.3s ease; + } + + .timestamp { + font-size: 12px; + color: #6e7681; + text-align: right; + margin-top: 30px; + } + + .error { + color: #f85149; + padding: 12px; + background: #1c1c1c; + border-radius: 6px; + margin: 20px 0; + font-size: 13px; + } + + .section-title { + font-size: 20px; + font-weight: 600; + margin-bottom: 20px; + color: #f0f6fc; + } + + .worker-row { + cursor: pointer; + transition: background 0.15s; + } + + .worker-row:hover { + background: #1c2128; + } + + .worker-row.selected { + background: #1f2d3d; + } + + .worker-detail { + margin-top: 20px; + border-top: 1px solid #30363d; + padding-top: 16px; + } + + .worker-detail-title { + font-size: 15px; + font-weight: 600; + color: #f0f6fc; + margin-bottom: 12px; + } + + .detail-section { + margin-bottom: 16px; + } + + .detail-section-label { + font-size: 11px; + font-weight: 600; + color: #8b949e; + text-transform: uppercase; + letter-spacing: 0.5px; + margin-bottom: 6px; + } + + .detail-table { + width: 100%; + border-collapse: collapse; + font-size: 12px; + } + + .detail-table td { + padding: 4px 8px; + color: #c9d1d9; + border-bottom: 1px solid #21262d; + vertical-align: top; + } + + .detail-table td:first-child { + color: #8b949e; + width: 40%; + font-family: monospace; + } + + .detail-table tr:last-child td { + border-bottom: none; + } + + .detail-mono { + font-family: monospace; + font-size: 11px; + color: #8b949e; + } + + .detail-tag { + display: inline-block; + padding: 2px 8px; + border-radius: 4px; + background: #21262d; + color: #c9d1d9; + font-size: 11px; + margin: 2px 4px 2px 0; + } + + .status-badge { + display: inline-block; + padding: 2px 8px; + border-radius: 4px; + font-size: 11px; + font-weight: 600; + } + + .status-badge.success { + background: rgba(63, 185, 80, 0.15); + color: #3fb950; + } + + .status-badge.failure { + background: rgba(248, 81, 73, 0.15); + color: #f85149; + } + </style> +</head> +<body> + <div class="container"> + <div class="header"> + <div> + <h1>Zen Compute Dashboard</h1> + <div class="timestamp">Last updated: <span id="last-update">Never</span></div> + </div> + <div class="health-indicator" id="health-indicator"> + <div class="status-dot"></div> + <span id="health-text">Checking...</span> + </div> + </div> + + <div id="error-container"></div> + + <!-- Action Queue Stats --> + <div class="section-title">Action Queue</div> + <div class="grid"> + <div class="card"> + <div class="card-title">Pending Actions</div> + <div class="metric-value" id="actions-pending">-</div> + <div class="metric-label">Waiting to be scheduled</div> + </div> + <div class="card"> + <div class="card-title">Running Actions</div> + <div class="metric-value" id="actions-running">-</div> + <div class="metric-label">Currently executing</div> + </div> + <div class="card"> + <div class="card-title">Completed Actions</div> + <div class="metric-value" id="actions-complete">-</div> + <div class="metric-label">Results available</div> + </div> + </div> + + <!-- Action Queue Chart --> + <div class="card" style="margin-bottom: 30px;"> + <div class="card-title">Action Queue History</div> + <div class="chart-container"> + <canvas id="queue-chart"></canvas> + </div> + </div> + + <!-- Performance Metrics --> + <div class="section-title">Performance Metrics</div> + <div class="card" style="margin-bottom: 30px;"> + <div class="card-title">Completion Rate</div> + <div class="rate-stats"> + <div class="rate-item"> + <div class="rate-value" id="rate-1">-</div> + <div class="rate-label">1 min rate</div> + </div> + <div class="rate-item"> + <div class="rate-value" id="rate-5">-</div> + <div class="rate-label">5 min rate</div> + </div> + <div class="rate-item"> + <div class="rate-value" id="rate-15">-</div> + <div class="rate-label">15 min rate</div> + </div> + </div> + <div style="margin-top: 20px;"> + <div class="stats-row"> + <span class="stats-label">Total Retired</span> + <span class="stats-value" id="retired-count">-</span> + </div> + <div class="stats-row"> + <span class="stats-label">Mean Rate</span> + <span class="stats-value" id="rate-mean">-</span> + </div> + </div> + </div> + + <!-- Workers --> + <div class="section-title">Workers</div> + <div class="card" style="margin-bottom: 30px;"> + <div class="card-title">Worker Status</div> + <div class="stats-row"> + <span class="stats-label">Registered Workers</span> + <span class="stats-value" id="worker-count">-</span> + </div> + <div id="worker-table-container" style="margin-top: 16px; display: none;"> + <table id="worker-table" style="width: 100%; border-collapse: collapse; font-size: 13px;"> + <thead> + <tr> + <th style="text-align: left; color: #8b949e; padding: 6px 8px; border-bottom: 1px solid #30363d; font-weight: 600; text-transform: uppercase; letter-spacing: 0.5px; font-size: 11px;">Name</th> + <th style="text-align: left; color: #8b949e; padding: 6px 8px; border-bottom: 1px solid #30363d; font-weight: 600; text-transform: uppercase; letter-spacing: 0.5px; font-size: 11px;">Platform</th> + <th style="text-align: right; color: #8b949e; padding: 6px 8px; border-bottom: 1px solid #30363d; font-weight: 600; text-transform: uppercase; letter-spacing: 0.5px; font-size: 11px;">Cores</th> + <th style="text-align: right; color: #8b949e; padding: 6px 8px; border-bottom: 1px solid #30363d; font-weight: 600; text-transform: uppercase; letter-spacing: 0.5px; font-size: 11px;">Timeout</th> + <th style="text-align: right; color: #8b949e; padding: 6px 8px; border-bottom: 1px solid #30363d; font-weight: 600; text-transform: uppercase; letter-spacing: 0.5px; font-size: 11px;">Functions</th> + <th style="text-align: left; color: #8b949e; padding: 6px 8px; border-bottom: 1px solid #30363d; font-weight: 600; text-transform: uppercase; letter-spacing: 0.5px; font-size: 11px;">Worker ID</th> + </tr> + </thead> + <tbody id="worker-table-body"></tbody> + </table> + <div id="worker-detail" class="worker-detail" style="display: none;"></div> + </div> + </div> + + <!-- Action History --> + <div class="section-title">Recent Actions</div> + <div class="card" style="margin-bottom: 30px;"> + <div class="card-title">Action History</div> + <div id="action-history-empty" style="color: #6e7681; font-size: 13px;">No actions recorded yet.</div> + <div id="action-history-container" style="display: none;"> + <table id="action-history-table" style="width: 100%; border-collapse: collapse; font-size: 13px;"> + <thead> + <tr> + <th style="text-align: right; color: #8b949e; padding: 6px 8px; border-bottom: 1px solid #30363d; font-weight: 600; text-transform: uppercase; letter-spacing: 0.5px; font-size: 11px; width: 60px;">LSN</th> + <th style="text-align: center; color: #8b949e; padding: 6px 8px; border-bottom: 1px solid #30363d; font-weight: 600; text-transform: uppercase; letter-spacing: 0.5px; font-size: 11px; width: 70px;">Status</th> + <th style="text-align: left; color: #8b949e; padding: 6px 8px; border-bottom: 1px solid #30363d; font-weight: 600; text-transform: uppercase; letter-spacing: 0.5px; font-size: 11px;">Function</th> + <th style="text-align: right; color: #8b949e; padding: 6px 8px; border-bottom: 1px solid #30363d; font-weight: 600; text-transform: uppercase; letter-spacing: 0.5px; font-size: 11px; width: 80px;">Started</th> + <th style="text-align: right; color: #8b949e; padding: 6px 8px; border-bottom: 1px solid #30363d; font-weight: 600; text-transform: uppercase; letter-spacing: 0.5px; font-size: 11px; width: 80px;">Finished</th> + <th style="text-align: right; color: #8b949e; padding: 6px 8px; border-bottom: 1px solid #30363d; font-weight: 600; text-transform: uppercase; letter-spacing: 0.5px; font-size: 11px; width: 80px;">Duration</th> + <th style="text-align: left; color: #8b949e; padding: 6px 8px; border-bottom: 1px solid #30363d; font-weight: 600; text-transform: uppercase; letter-spacing: 0.5px; font-size: 11px;">Worker ID</th> + <th style="text-align: left; color: #8b949e; padding: 6px 8px; border-bottom: 1px solid #30363d; font-weight: 600; text-transform: uppercase; letter-spacing: 0.5px; font-size: 11px;">Action ID</th> + </tr> + </thead> + <tbody id="action-history-body"></tbody> + </table> + </div> + </div> + + <!-- System Resources --> + <div class="section-title">System Resources</div> + <div class="grid"> + <div class="card"> + <div class="card-title">CPU Usage</div> + <div class="metric-value" id="cpu-usage">-</div> + <div class="metric-label">Percent</div> + <div class="progress-bar"> + <div class="progress-fill" id="cpu-progress" style="width: 0%"></div> + </div> + <div style="position: relative; height: 60px; margin-top: 12px;"> + <canvas id="cpu-chart"></canvas> + </div> + <div style="margin-top: 12px;"> + <div class="stats-row"> + <span class="stats-label">Packages</span> + <span class="stats-value" id="cpu-packages">-</span> + </div> + <div class="stats-row"> + <span class="stats-label">Physical Cores</span> + <span class="stats-value" id="cpu-cores">-</span> + </div> + <div class="stats-row"> + <span class="stats-label">Logical Processors</span> + <span class="stats-value" id="cpu-lp">-</span> + </div> + </div> + </div> + <div class="card"> + <div class="card-title">Memory</div> + <div class="stats-row"> + <span class="stats-label">Used</span> + <span class="stats-value" id="memory-used">-</span> + </div> + <div class="stats-row"> + <span class="stats-label">Total</span> + <span class="stats-value" id="memory-total">-</span> + </div> + <div class="progress-bar"> + <div class="progress-fill" id="memory-progress" style="width: 0%"></div> + </div> + </div> + <div class="card"> + <div class="card-title">Disk</div> + <div class="stats-row"> + <span class="stats-label">Used</span> + <span class="stats-value" id="disk-used">-</span> + </div> + <div class="stats-row"> + <span class="stats-label">Total</span> + <span class="stats-value" id="disk-total">-</span> + </div> + <div class="progress-bar"> + <div class="progress-fill" id="disk-progress" style="width: 0%"></div> + </div> + </div> + </div> + </div> + + <script> + // Configuration + const BASE_URL = window.location.origin; + const REFRESH_INTERVAL = 2000; // 2 seconds + const MAX_HISTORY_POINTS = 60; // Show last 2 minutes + + // Data storage + const history = { + timestamps: [], + pending: [], + running: [], + completed: [], + cpu: [] + }; + + // CPU sparkline chart + const cpuCtx = document.getElementById('cpu-chart').getContext('2d'); + const cpuChart = new Chart(cpuCtx, { + type: 'line', + data: { + labels: [], + datasets: [{ + data: [], + borderColor: '#58a6ff', + backgroundColor: 'rgba(88, 166, 255, 0.15)', + borderWidth: 1.5, + tension: 0.4, + fill: true, + pointRadius: 0 + }] + }, + options: { + responsive: true, + maintainAspectRatio: false, + animation: false, + plugins: { legend: { display: false }, tooltip: { enabled: false } }, + scales: { + x: { display: false }, + y: { display: false, min: 0, max: 100 } + } + } + }); + + // Queue chart setup + const ctx = document.getElementById('queue-chart').getContext('2d'); + const chart = new Chart(ctx, { + type: 'line', + data: { + labels: [], + datasets: [ + { + label: 'Pending', + data: [], + borderColor: '#f0883e', + backgroundColor: 'rgba(240, 136, 62, 0.1)', + tension: 0.4, + fill: true + }, + { + label: 'Running', + data: [], + borderColor: '#58a6ff', + backgroundColor: 'rgba(88, 166, 255, 0.1)', + tension: 0.4, + fill: true + }, + { + label: 'Completed', + data: [], + borderColor: '#3fb950', + backgroundColor: 'rgba(63, 185, 80, 0.1)', + tension: 0.4, + fill: true + } + ] + }, + options: { + responsive: true, + maintainAspectRatio: false, + plugins: { + legend: { + display: true, + labels: { + color: '#8b949e' + } + } + }, + scales: { + x: { + display: false + }, + y: { + beginAtZero: true, + ticks: { + color: '#8b949e' + }, + grid: { + color: '#21262d' + } + } + } + } + }); + + // Helper functions + function formatBytes(bytes) { + if (bytes === 0) return '0 B'; + const k = 1024; + const sizes = ['B', 'KB', 'MB', 'GB', 'TB']; + const i = Math.floor(Math.log(bytes) / Math.log(k)); + return parseFloat((bytes / Math.pow(k, i)).toFixed(2)) + ' ' + sizes[i]; + } + + function formatRate(rate) { + return rate.toFixed(2) + '/s'; + } + + function showError(message) { + const container = document.getElementById('error-container'); + container.innerHTML = `<div class="error">Error: ${message}</div>`; + } + + function clearError() { + document.getElementById('error-container').innerHTML = ''; + } + + function updateTimestamp() { + const now = new Date(); + document.getElementById('last-update').textContent = now.toLocaleTimeString(); + } + + // Fetch functions + async function fetchJSON(endpoint) { + const response = await fetch(`${BASE_URL}${endpoint}`, { + headers: { + 'Accept': 'application/json' + } + }); + if (!response.ok) { + throw new Error(`HTTP ${response.status}: ${response.statusText}`); + } + return await response.json(); + } + + async function fetchHealth() { + try { + const response = await fetch(`${BASE_URL}/apply/ready`); + const isHealthy = response.status === 200; + + const indicator = document.getElementById('health-indicator'); + const text = document.getElementById('health-text'); + + if (isHealthy) { + indicator.classList.add('healthy'); + indicator.classList.remove('unhealthy'); + text.textContent = 'Healthy'; + } else { + indicator.classList.add('unhealthy'); + indicator.classList.remove('healthy'); + text.textContent = 'Unhealthy'; + } + + return isHealthy; + } catch (error) { + const indicator = document.getElementById('health-indicator'); + const text = document.getElementById('health-text'); + indicator.classList.add('unhealthy'); + indicator.classList.remove('healthy'); + text.textContent = 'Error'; + throw error; + } + } + + async function fetchStats() { + const data = await fetchJSON('/stats/apply'); + + // Update action counts + document.getElementById('actions-pending').textContent = data.actions_pending || 0; + document.getElementById('actions-running').textContent = data.actions_submitted || 0; + document.getElementById('actions-complete').textContent = data.actions_complete || 0; + + // Update completion rates + if (data.actions_retired) { + document.getElementById('rate-1').textContent = formatRate(data.actions_retired.rate_1 || 0); + document.getElementById('rate-5').textContent = formatRate(data.actions_retired.rate_5 || 0); + document.getElementById('rate-15').textContent = formatRate(data.actions_retired.rate_15 || 0); + document.getElementById('retired-count').textContent = data.actions_retired.count || 0; + document.getElementById('rate-mean').textContent = formatRate(data.actions_retired.rate_mean || 0); + } + + // Update chart + const now = new Date().toLocaleTimeString(); + history.timestamps.push(now); + history.pending.push(data.actions_pending || 0); + history.running.push(data.actions_submitted || 0); + history.completed.push(data.actions_complete || 0); + + // Keep only last N points + if (history.timestamps.length > MAX_HISTORY_POINTS) { + history.timestamps.shift(); + history.pending.shift(); + history.running.shift(); + history.completed.shift(); + } + + chart.data.labels = history.timestamps; + chart.data.datasets[0].data = history.pending; + chart.data.datasets[1].data = history.running; + chart.data.datasets[2].data = history.completed; + chart.update('none'); + } + + async function fetchSysInfo() { + const data = await fetchJSON('/apply/sysinfo'); + + // Update CPU + const cpuUsage = data.cpu_usage || 0; + document.getElementById('cpu-usage').textContent = cpuUsage.toFixed(1) + '%'; + document.getElementById('cpu-progress').style.width = cpuUsage + '%'; + + history.cpu.push(cpuUsage); + if (history.cpu.length > MAX_HISTORY_POINTS) history.cpu.shift(); + cpuChart.data.labels = history.cpu.map(() => ''); + cpuChart.data.datasets[0].data = history.cpu; + cpuChart.update('none'); + + document.getElementById('cpu-packages').textContent = data.cpu_count ?? '-'; + document.getElementById('cpu-cores').textContent = data.core_count ?? '-'; + document.getElementById('cpu-lp').textContent = data.lp_count ?? '-'; + + // Update Memory + const memUsed = data.memory_used || 0; + const memTotal = data.memory_total || 1; + const memPercent = (memUsed / memTotal) * 100; + document.getElementById('memory-used').textContent = formatBytes(memUsed); + document.getElementById('memory-total').textContent = formatBytes(memTotal); + document.getElementById('memory-progress').style.width = memPercent + '%'; + + // Update Disk + const diskUsed = data.disk_used || 0; + const diskTotal = data.disk_total || 1; + const diskPercent = (diskUsed / diskTotal) * 100; + document.getElementById('disk-used').textContent = formatBytes(diskUsed); + document.getElementById('disk-total').textContent = formatBytes(diskTotal); + document.getElementById('disk-progress').style.width = diskPercent + '%'; + } + + // Persists the selected worker ID across refreshes + let selectedWorkerId = null; + + function renderWorkerDetail(id, desc) { + const panel = document.getElementById('worker-detail'); + + if (!desc) { + panel.style.display = 'none'; + return; + } + + function field(label, value) { + return `<tr><td>${label}</td><td>${value ?? '-'}</td></tr>`; + } + + function monoField(label, value) { + return `<tr><td>${label}</td><td class="detail-mono">${value ?? '-'}</td></tr>`; + } + + // Functions + const functions = desc.functions || []; + const functionsHtml = functions.length === 0 ? '<span style="color:#6e7681;font-size:12px;">none</span>' : + `<table class="detail-table">${functions.map(f => + `<tr><td>${f.name || '-'}</td><td class="detail-mono">${f.version || '-'}</td></tr>` + ).join('')}</table>`; + + // Executables + const executables = desc.executables || []; + const totalExecSize = executables.reduce((sum, e) => sum + (e.size || 0), 0); + const execHtml = executables.length === 0 ? '<span style="color:#6e7681;font-size:12px;">none</span>' : + `<table class="detail-table"> + <tr style="font-size:11px;"> + <td style="color:#6e7681;padding-bottom:4px;">Path</td> + <td style="color:#6e7681;padding-bottom:4px;">Hash</td> + <td style="color:#6e7681;padding-bottom:4px;text-align:right;">Size</td> + </tr> + ${executables.map(e => + `<tr> + <td>${e.name || '-'}</td> + <td class="detail-mono">${e.hash || '-'}</td> + <td style="text-align:right;white-space:nowrap;">${e.size != null ? formatBytes(e.size) : '-'}</td> + </tr>` + ).join('')} + <tr style="border-top:1px solid #30363d;"> + <td style="color:#8b949e;padding-top:6px;">Total</td> + <td></td> + <td style="text-align:right;white-space:nowrap;padding-top:6px;color:#f0f6fc;font-weight:600;">${formatBytes(totalExecSize)}</td> + </tr> + </table>`; + + // Files + const files = desc.files || []; + const filesHtml = files.length === 0 ? '<span style="color:#6e7681;font-size:12px;">none</span>' : + `<table class="detail-table">${files.map(f => + `<tr><td>${f.name || f}</td><td class="detail-mono">${f.hash || ''}</td></tr>` + ).join('')}</table>`; + + // Dirs + const dirs = desc.dirs || []; + const dirsHtml = dirs.length === 0 ? '<span style="color:#6e7681;font-size:12px;">none</span>' : + dirs.map(d => `<span class="detail-tag">${d}</span>`).join(''); + + // Environment + const env = desc.environment || []; + const envHtml = env.length === 0 ? '<span style="color:#6e7681;font-size:12px;">none</span>' : + env.map(e => `<span class="detail-tag">${e}</span>`).join(''); + + panel.innerHTML = ` + <div class="worker-detail-title">${desc.name || id}</div> + <div class="detail-section"> + <table class="detail-table"> + ${field('Worker ID', `<span class="detail-mono">${id}</span>`)} + ${field('Path', desc.path)} + ${field('Platform', desc.host)} + ${monoField('Build System', desc.buildsystem_version)} + ${field('Cores', desc.cores)} + ${field('Timeout', desc.timeout != null ? desc.timeout + 's' : null)} + </table> + </div> + <div class="detail-section"> + <div class="detail-section-label">Functions</div> + ${functionsHtml} + </div> + <div class="detail-section"> + <div class="detail-section-label">Executables</div> + ${execHtml} + </div> + <div class="detail-section"> + <div class="detail-section-label">Files</div> + ${filesHtml} + </div> + <div class="detail-section"> + <div class="detail-section-label">Directories</div> + ${dirsHtml} + </div> + <div class="detail-section"> + <div class="detail-section-label">Environment</div> + ${envHtml} + </div> + `; + panel.style.display = 'block'; + } + + async function fetchWorkers() { + const data = await fetchJSON('/apply/workers'); + const workerIds = data.workers || []; + + document.getElementById('worker-count').textContent = workerIds.length; + + const container = document.getElementById('worker-table-container'); + const tbody = document.getElementById('worker-table-body'); + + if (workerIds.length === 0) { + container.style.display = 'none'; + selectedWorkerId = null; + return; + } + + const descriptors = await Promise.all( + workerIds.map(id => fetchJSON(`/apply/workers/${id}`).catch(() => null)) + ); + + // Build a map for quick lookup by ID + const descriptorMap = {}; + workerIds.forEach((id, i) => { descriptorMap[id] = descriptors[i]; }); + + tbody.innerHTML = ''; + descriptors.forEach((desc, i) => { + const id = workerIds[i]; + const name = desc ? (desc.name || '-') : '-'; + const host = desc ? (desc.host || '-') : '-'; + const cores = desc ? (desc.cores != null ? desc.cores : '-') : '-'; + const timeout = desc ? (desc.timeout != null ? desc.timeout + 's' : '-') : '-'; + const functions = desc ? (desc.functions ? desc.functions.length : 0) : '-'; + + const tr = document.createElement('tr'); + tr.className = 'worker-row' + (id === selectedWorkerId ? ' selected' : ''); + tr.dataset.workerId = id; + tr.innerHTML = ` + <td style="padding: 6px 8px; color: #f0f6fc; border-bottom: 1px solid #21262d;">${name}</td> + <td style="padding: 6px 8px; color: #c9d1d9; border-bottom: 1px solid #21262d;">${host}</td> + <td style="padding: 6px 8px; color: #c9d1d9; border-bottom: 1px solid #21262d; text-align: right;">${cores}</td> + <td style="padding: 6px 8px; color: #c9d1d9; border-bottom: 1px solid #21262d; text-align: right;">${timeout}</td> + <td style="padding: 6px 8px; color: #c9d1d9; border-bottom: 1px solid #21262d; text-align: right;">${functions}</td> + <td style="padding: 6px 8px; color: #8b949e; border-bottom: 1px solid #21262d; font-family: monospace; font-size: 11px;">${id}</td> + `; + tr.addEventListener('click', () => { + document.querySelectorAll('.worker-row').forEach(r => r.classList.remove('selected')); + if (selectedWorkerId === id) { + // Toggle off + selectedWorkerId = null; + document.getElementById('worker-detail').style.display = 'none'; + } else { + selectedWorkerId = id; + tr.classList.add('selected'); + renderWorkerDetail(id, descriptorMap[id]); + } + }); + tbody.appendChild(tr); + }); + + // Re-render detail if selected worker is still present + if (selectedWorkerId && descriptorMap[selectedWorkerId]) { + renderWorkerDetail(selectedWorkerId, descriptorMap[selectedWorkerId]); + } else if (selectedWorkerId && !descriptorMap[selectedWorkerId]) { + selectedWorkerId = null; + document.getElementById('worker-detail').style.display = 'none'; + } + + container.style.display = 'block'; + } + + // Windows FILETIME: 100ns ticks since 1601-01-01. Convert to JS Date. + const FILETIME_EPOCH_OFFSET_MS = 11644473600000n; + function filetimeToDate(ticks) { + if (!ticks) return null; + const ms = BigInt(ticks) / 10000n - FILETIME_EPOCH_OFFSET_MS; + return new Date(Number(ms)); + } + + function formatTime(date) { + if (!date) return '-'; + return date.toLocaleTimeString([], { hour: '2-digit', minute: '2-digit', second: '2-digit' }); + } + + function formatDuration(startDate, endDate) { + if (!startDate || !endDate) return '-'; + const ms = endDate - startDate; + if (ms < 0) return '-'; + if (ms < 1000) return ms + ' ms'; + if (ms < 60000) return (ms / 1000).toFixed(2) + ' s'; + const m = Math.floor(ms / 60000); + const s = ((ms % 60000) / 1000).toFixed(0).padStart(2, '0'); + return `${m}m ${s}s`; + } + + async function fetchActionHistory() { + const data = await fetchJSON('/apply/jobs/history?limit=50'); + const entries = data.history || []; + + const empty = document.getElementById('action-history-empty'); + const container = document.getElementById('action-history-container'); + const tbody = document.getElementById('action-history-body'); + + if (entries.length === 0) { + empty.style.display = ''; + container.style.display = 'none'; + return; + } + + empty.style.display = 'none'; + tbody.innerHTML = ''; + + // Entries arrive oldest-first; reverse to show newest at top + for (const entry of [...entries].reverse()) { + const lsn = entry.lsn ?? '-'; + const succeeded = entry.succeeded; + const badge = succeeded == null + ? '<span class="status-badge" style="background:#21262d;color:#8b949e;">unknown</span>' + : succeeded + ? '<span class="status-badge success">ok</span>' + : '<span class="status-badge failure">failed</span>'; + const desc = entry.actionDescriptor || {}; + const fn = desc.Function || '-'; + const workerId = entry.workerId || '-'; + const actionId = entry.actionId || '-'; + + const startDate = filetimeToDate(entry.time_Running); + const endDate = filetimeToDate(entry.time_Completed ?? entry.time_Failed); + + const tr = document.createElement('tr'); + tr.innerHTML = ` + <td style="padding: 6px 8px; color: #8b949e; border-bottom: 1px solid #21262d; text-align: right; font-family: monospace;">${lsn}</td> + <td style="padding: 6px 8px; border-bottom: 1px solid #21262d; text-align: center;">${badge}</td> + <td style="padding: 6px 8px; color: #f0f6fc; border-bottom: 1px solid #21262d;">${fn}</td> + <td style="padding: 6px 8px; color: #8b949e; border-bottom: 1px solid #21262d; text-align: right; font-size: 12px; white-space: nowrap;">${formatTime(startDate)}</td> + <td style="padding: 6px 8px; color: #8b949e; border-bottom: 1px solid #21262d; text-align: right; font-size: 12px; white-space: nowrap;">${formatTime(endDate)}</td> + <td style="padding: 6px 8px; color: #c9d1d9; border-bottom: 1px solid #21262d; text-align: right; font-size: 12px; white-space: nowrap;">${formatDuration(startDate, endDate)}</td> + <td style="padding: 6px 8px; color: #8b949e; border-bottom: 1px solid #21262d; font-family: monospace; font-size: 11px;">${workerId}</td> + <td style="padding: 6px 8px; color: #8b949e; border-bottom: 1px solid #21262d; font-family: monospace; font-size: 11px;">${actionId}</td> + `; + tbody.appendChild(tr); + } + + container.style.display = 'block'; + } + + async function updateDashboard() { + try { + await Promise.all([ + fetchHealth(), + fetchStats(), + fetchSysInfo(), + fetchWorkers(), + fetchActionHistory() + ]); + + clearError(); + updateTimestamp(); + } catch (error) { + console.error('Error updating dashboard:', error); + showError(error.message); + } + } + + // Start updating + updateDashboard(); + setInterval(updateDashboard, REFRESH_INTERVAL); + </script> +</body> +</html> diff --git a/src/zenserver/main.cpp b/src/zenserver/main.cpp index 1a929b026..ee783d2a6 100644 --- a/src/zenserver/main.cpp +++ b/src/zenserver/main.cpp @@ -23,6 +23,9 @@ #include <zenutil/service.h> #include "diag/logging.h" + +#include "compute/computeserver.h" + #include "storage/storageconfig.h" #include "storage/zenstorageserver.h" @@ -61,11 +64,19 @@ namespace zen { #if ZEN_PLATFORM_WINDOWS -template<class T> +/** Windows Service wrapper for Zen servers + * + * This class wraps a Zen server main entry point (the Main template parameter) + * into a Windows Service by implementing the WindowsService interface. + * + * The Main type needs to implement the virtual functions from the ZenServerMain + * base class, which provides the actual server logic. + */ +template<class Main> class ZenWindowsService : public WindowsService { public: - ZenWindowsService(typename T::Config& ServerOptions) : m_EntryPoint(ServerOptions) {} + ZenWindowsService(typename Main::Config& ServerOptions) : m_EntryPoint(ServerOptions) {} ZenWindowsService(const ZenWindowsService&) = delete; ZenWindowsService& operator=(const ZenWindowsService&) = delete; @@ -73,7 +84,7 @@ public: virtual int Run() override { return m_EntryPoint.Run(); } private: - T m_EntryPoint; + Main m_EntryPoint; }; #endif // ZEN_PLATFORM_WINDOWS @@ -84,6 +95,23 @@ private: namespace zen { +/** Application main entry point template + * + * This function handles common application startup tasks while allowing + * different server types to be plugged in via the Main template parameter. + * + * On Windows, this function also handles platform-specific service + * installation and uninstallation. + * + * The Main type needs to implement the virtual functions from the ZenServerMain + * base class, which provides the actual server logic. + * + * The Main type is also expected to provide the following members: + * + * typedef Config -- Server configuration type, derived from ZenServerConfig + * typedef Configurator -- Server configuration handler type, implements ZenServerConfiguratorBase + * + */ template<class Main> int AppMain(int argc, char* argv[]) @@ -241,7 +269,12 @@ main(int argc, char* argv[]) auto _ = zen::MakeGuard([] { // Allow some time for worker threads to unravel, in an effort - // to prevent shutdown races in TLS object destruction + // to prevent shutdown races in TLS object destruction, mainly due to + // threads which we don't directly control (Windows thread pool) and + // therefore can't join. + // + // This isn't a great solution, but for now it seems to help reduce + // shutdown crashes observed in some situations. WaitForThreads(1000); }); @@ -249,6 +282,7 @@ main(int argc, char* argv[]) { kHub, kStore, + kCompute, kTest } ServerMode = kStore; @@ -258,10 +292,14 @@ main(int argc, char* argv[]) { ServerMode = kHub; } - else if (argv[1] == "store"sv) + else if ((argv[1] == "store"sv) || (argv[1] == "storage"sv)) { ServerMode = kStore; } + else if (argv[1] == "compute"sv) + { + ServerMode = kCompute; + } else if (argv[1] == "test"sv) { ServerMode = kTest; @@ -280,6 +318,13 @@ main(int argc, char* argv[]) break; case kHub: return AppMain<ZenHubServerMain>(argc, argv); + case kCompute: +#if ZEN_WITH_COMPUTE_SERVICES + return AppMain<ZenComputeServerMain>(argc, argv); +#else + fprintf(stderr, "compute services are not compiled in!\n"); + exit(5); +#endif default: case kStore: return AppMain<ZenStorageServerMain>(argc, argv); diff --git a/src/zenserver/storage/storageconfig.cpp b/src/zenserver/storage/storageconfig.cpp index 0f8ab1e98..089b6b572 100644 --- a/src/zenserver/storage/storageconfig.cpp +++ b/src/zenserver/storage/storageconfig.cpp @@ -797,6 +797,7 @@ ZenStorageServerCmdLineOptions::AddCacheOptions(cxxopts::Options& options, ZenSt cxxopts::value<uint64_t>(ServerOptions.StructuredCacheConfig.MemMaxAgeSeconds)->default_value("86400"), ""); + options.add_option("compute", "", "lie-cpus", "Lie to upstream about CPU capabilities", cxxopts::value<int>(ServerOptions.LieCpu), ""); options.add_option("cache", "", "cache-bucket-maxblocksize", diff --git a/src/zenserver/storage/storageconfig.h b/src/zenserver/storage/storageconfig.h index d59d05cf6..b408b0c26 100644 --- a/src/zenserver/storage/storageconfig.h +++ b/src/zenserver/storage/storageconfig.h @@ -156,6 +156,7 @@ struct ZenStorageServerConfig : public ZenServerConfig ZenWorkspacesConfig WorksSpacesConfig; std::filesystem::path PluginsConfigFile; // Path to plugins config file bool ObjectStoreEnabled = false; + bool ComputeEnabled = true; std::string ScrubOptions; }; diff --git a/src/zenserver/storage/zenstorageserver.cpp b/src/zenserver/storage/zenstorageserver.cpp index 2b74395c3..ff854b72d 100644 --- a/src/zenserver/storage/zenstorageserver.cpp +++ b/src/zenserver/storage/zenstorageserver.cpp @@ -182,6 +182,13 @@ ZenStorageServer::RegisterServices() #endif // ZEN_WITH_VFS m_Http->RegisterService(*m_AdminService); + +#if ZEN_WITH_COMPUTE_SERVICES + if (m_HttpFunctionService) + { + m_Http->RegisterService(*m_HttpFunctionService); + } +#endif } void @@ -267,6 +274,16 @@ ZenStorageServer::InitializeServices(const ZenStorageServerConfig& ServerOptions m_BuildStoreService = std::make_unique<HttpBuildStoreService>(m_StatusService, m_StatsService, *m_BuildStore); } +#if ZEN_WITH_COMPUTE_SERVICES + if (ServerOptions.ComputeEnabled) + { + ZEN_OTEL_SPAN("InitializeComputeService"); + + m_HttpFunctionService = + std::make_unique<compute::HttpFunctionService>(*m_CidStore, m_StatsService, ServerOptions.DataDir / "functions"); + } +#endif + #if ZEN_WITH_VFS m_VfsServiceImpl = std::make_unique<VfsServiceImpl>(); m_VfsServiceImpl->AddService(Ref<ProjectStore>(m_ProjectStore)); @@ -805,6 +822,10 @@ ZenStorageServer::Cleanup() Flush(); +#if ZEN_WITH_COMPUTE_SERVICES + m_HttpFunctionService.reset(); +#endif + m_AdminService.reset(); m_VfsService.reset(); m_VfsServiceImpl.reset(); diff --git a/src/zenserver/storage/zenstorageserver.h b/src/zenserver/storage/zenstorageserver.h index 5ccb587d6..456447a2a 100644 --- a/src/zenserver/storage/zenstorageserver.h +++ b/src/zenserver/storage/zenstorageserver.h @@ -6,6 +6,7 @@ #include <zenhttp/auth/authmgr.h> #include <zenhttp/auth/authservice.h> +#include <zenhttp/httpapiservice.h> #include <zenhttp/httptest.h> #include <zenstore/cache/structuredcachestore.h> #include <zenstore/gc.h> @@ -23,6 +24,10 @@ #include "vfs/vfsservice.h" #include "workspaces/httpworkspaces.h" +#if ZEN_WITH_COMPUTE_SERVICES +# include <zencompute/httpfunctionservice.h> +#endif + namespace zen { class ZenStorageServer : public ZenServerBase @@ -34,11 +39,6 @@ public: ZenStorageServer(); ~ZenStorageServer(); - void SetDedicatedMode(bool State) { m_IsDedicatedMode = State; } - void SetTestMode(bool State) { m_TestMode = State; } - void SetDataRoot(std::filesystem::path Root) { m_DataRoot = Root; } - void SetContentRoot(std::filesystem::path Root) { m_ContentRoot = Root; } - int Initialize(const ZenStorageServerConfig& ServerOptions, ZenServerState::ZenServerEntry* ServerEntry); void Run(); void Cleanup(); @@ -48,14 +48,9 @@ private: void InitializeStructuredCache(const ZenStorageServerConfig& ServerOptions); void Flush(); - bool m_IsDedicatedMode = false; - bool m_TestMode = false; - bool m_DebugOptionForcedCrash = false; - std::string m_StartupScrubOptions; - CbObject m_RootManifest; - std::filesystem::path m_DataRoot; - std::filesystem::path m_ContentRoot; - asio::steady_timer m_StateMarkerTimer{m_IoContext}; + std::string m_StartupScrubOptions; + CbObject m_RootManifest; + asio::steady_timer m_StateMarkerTimer{m_IoContext}; void EnqueueStateMarkerTimer(); void CheckStateMarker(); @@ -95,6 +90,11 @@ private: std::unique_ptr<HttpBuildStoreService> m_BuildStoreService; std::unique_ptr<VfsService> m_VfsService; std::unique_ptr<HttpAdminService> m_AdminService; + std::unique_ptr<HttpApiService> m_ApiService; + +#if ZEN_WITH_COMPUTE_SERVICES + std::unique_ptr<compute::HttpFunctionService> m_HttpFunctionService; +#endif }; struct ZenStorageServerConfigurator; diff --git a/src/zenserver/xmake.lua b/src/zenserver/xmake.lua index 6ee80dc62..9ab51beb2 100644 --- a/src/zenserver/xmake.lua +++ b/src/zenserver/xmake.lua @@ -2,7 +2,11 @@ target("zenserver") set_kind("binary") + if enable_unity then + add_rules("c++.unity_build", {batchsize = 4}) + end add_deps("zencore", + "zencompute", "zenhttp", "zennet", "zenremotestore", diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp index 7f9bf56a9..7bf6126df 100644 --- a/src/zenserver/zenserver.cpp +++ b/src/zenserver/zenserver.cpp @@ -18,6 +18,7 @@ #include <zencore/sentryintegration.h> #include <zencore/session.h> #include <zencore/string.h> +#include <zencore/system.h> #include <zencore/thread.h> #include <zencore/timer.h> #include <zencore/trace.h> @@ -145,6 +146,13 @@ ZenServerBase::Initialize(const ZenServerConfig& ServerOptions, ZenServerState:: InitializeSecuritySettings(ServerOptions); + if (ServerOptions.LieCpu) + { + SetCpuCountForReporting(ServerOptions.LieCpu); + + ZEN_INFO("Reporting concurrency: {}", ServerOptions.LieCpu); + } + m_StatusService.RegisterHandler("status", *this); m_Http->RegisterService(m_StatusService); diff --git a/src/zenserver/zenserver.h b/src/zenserver/zenserver.h index efa46f361..5a8a079c0 100644 --- a/src/zenserver/zenserver.h +++ b/src/zenserver/zenserver.h @@ -43,6 +43,11 @@ public: void SetIsReadyFunc(std::function<void()>&& IsReadyFunc) { m_IsReadyFunc = std::move(IsReadyFunc); } + void SetDataRoot(std::filesystem::path Root) { m_DataRoot = Root; } + void SetContentRoot(std::filesystem::path Root) { m_ContentRoot = Root; } + void SetDedicatedMode(bool State) { m_IsDedicatedMode = State; } + void SetTestMode(bool State) { m_TestMode = State; } + protected: int Initialize(const ZenServerConfig& ServerOptions, ZenServerState::ZenServerEntry* ServerEntry); void Finalize(); @@ -55,6 +60,10 @@ protected: bool m_UseSentry = false; bool m_IsPowerCycle = false; + bool m_IsDedicatedMode = false; + bool m_TestMode = false; + bool m_DebugOptionForcedCrash = false; + std::thread m_IoRunner; asio::io_context m_IoContext; void EnsureIoRunner(); @@ -72,6 +81,9 @@ protected: std::function<void()> m_IsReadyFunc; void OnReady(); + std::filesystem::path m_DataRoot; // Root directory for server state + std::filesystem::path m_ContentRoot; // Root directory for frontend content + Ref<HttpServer> m_Http; std::unique_ptr<IHttpRequestFilter> m_HttpRequestFilter; @@ -114,7 +126,6 @@ protected: private: void InitializeSecuritySettings(const ZenServerConfig& ServerOptions); }; - class ZenServerMain { public: diff --git a/src/zentest-appstub/xmake.lua b/src/zentest-appstub/xmake.lua index 97615e322..db3ff2e2d 100644 --- a/src/zentest-appstub/xmake.lua +++ b/src/zentest-appstub/xmake.lua @@ -5,6 +5,9 @@ target("zentest-appstub") set_group("tests") add_headerfiles("**.h") add_files("*.cpp") + add_deps("zencore") + add_packages("vcpkg::gsl-lite") -- this should ideally be propagated by the zencore dependency + add_packages("vcpkg::mimalloc") if is_os("linux") then add_syslinks("pthread") diff --git a/src/zentest-appstub/zentest-appstub.cpp b/src/zentest-appstub/zentest-appstub.cpp index 24cf21e97..926580d96 100644 --- a/src/zentest-appstub/zentest-appstub.cpp +++ b/src/zentest-appstub/zentest-appstub.cpp @@ -1,33 +1,408 @@ // Copyright Epic Games, Inc. All Rights Reserved. +#include <zencore/compactbinary.h> +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinarypackage.h> +#include <zencore/compress.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/stream.h> + +#if ZEN_WITH_TESTS +# define ZEN_TEST_WITH_RUNNER 1 +# include <zencore/testing.h> +#endif + +#include <fmt/format.h> + #include <stdio.h> +#include <algorithm> #include <chrono> #include <cstdlib> #include <cstring> +#include <filesystem> +#include <string> +#include <system_error> #include <thread> -using namespace std::chrono_literals; +using namespace std::literals; +using namespace zen; + +#if !defined(_MSC_VER) +# define _strnicmp strncasecmp // TEMPORARY WORKAROUND - should not be using this +#endif + +// Some basic functions to implement some test "compute" functions + +std::string +Rot13Function(std::string_view InputString) +{ + std::string OutputString{InputString}; + + std::transform(OutputString.begin(), + OutputString.end(), + OutputString.begin(), + [](std::string::value_type c) -> std::string::value_type { + if (c >= 'a' && c <= 'z') + { + return 'a' + (c - 'a' + 13) % 26; + } + else if (c >= 'A' && c <= 'Z') + { + return 'A' + (c - 'A' + 13) % 26; + } + else + { + return c; + } + }); + + return OutputString; +} + +std::string +ReverseFunction(std::string_view InputString) +{ + std::string OutputString{InputString}; + std::reverse(OutputString.begin(), OutputString.end()); + return OutputString; +} + +std::string +IdentityFunction(std::string_view InputString) +{ + return std::string{InputString}; +} + +std::string +NullFunction(std::string_view) +{ + return {}; +} + +zen::CbObject +DescribeFunctions() +{ + CbObjectWriter Versions; + Versions << "BuildSystemVersion" << Guid::FromString("17fe280d-ccd8-4be8-a9d1-89c944a70969"sv); + + Versions.BeginArray("Functions"sv); + Versions.BeginObject(); + Versions << "Name"sv + << "Null"sv; + Versions << "Version"sv << Guid::FromString("00000000-0000-0000-0000-000000000000"sv); + Versions.EndObject(); + Versions.BeginObject(); + Versions << "Name"sv + << "Identity"sv; + Versions << "Version"sv << Guid::FromString("11111111-1111-1111-1111-111111111111"sv); + Versions.EndObject(); + Versions.BeginObject(); + Versions << "Name"sv + << "Rot13"sv; + Versions << "Version"sv << Guid::FromString("13131313-1313-1313-1313-131313131313"sv); + Versions.EndObject(); + Versions.BeginObject(); + Versions << "Name"sv + << "Reverse"sv; + Versions << "Version"sv << Guid::FromString("31313131-3131-3131-3131-313131313131"sv); + Versions.EndObject(); + Versions.EndArray(); + + return Versions.Save(); +} + +struct ContentResolver +{ + std::filesystem::path InputsRoot; + + CompressedBuffer ResolveChunk(IoHash Hash, uint64_t ExpectedSize) + { + std::filesystem::path ChunkPath = InputsRoot / Hash.ToHexString(); + IoBuffer ChunkBuffer = IoBufferBuilder::MakeFromFile(ChunkPath); + + IoHash RawHash; + uint64_t RawSize = 0; + CompressedBuffer AsCompressed = CompressedBuffer::FromCompressed(SharedBuffer(ChunkBuffer), RawHash, RawSize); + + if (RawSize != ExpectedSize) + { + throw std::runtime_error( + fmt::format("chunk size mismatch - expected {}, got {} for '{}'", ExpectedSize, ChunkBuffer.Size(), ChunkPath)); + } + if (RawHash != Hash) + { + throw std::runtime_error(fmt::format("chunk hash mismatch - expected {}, got {} for '{}'", Hash, RawHash, ChunkPath)); + } + + return AsCompressed; + } +}; + +zen::CbPackage +ExecuteFunction(CbObject Action, ContentResolver ChunkResolver) +{ + auto Apply = [&](auto Func) { + zen::CbPackage Result; + auto Source = Action["Inputs"sv].AsObjectView()["Source"sv].AsObjectView(); + + IoHash InputRawHash = Source["RawHash"sv].AsHash(); + uint64_t InputRawSize = Source["RawSize"sv].AsUInt64(); + + zen::CompressedBuffer InputData = ChunkResolver.ResolveChunk(InputRawHash, InputRawSize); + SharedBuffer Input = InputData.Decompress(); + + std::string Output = Func(std::string_view(static_cast<const char*>(Input.GetData()), Input.GetSize())); + zen::CompressedBuffer OutputData = + zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Output), OodleCompressor::Selkie, OodleCompressionLevel::HyperFast4); + IoHash OutputRawHash = OutputData.DecodeRawHash(); + + CbAttachment OutputAttachment(std::move(OutputData), OutputRawHash); + + CbObjectWriter Cbo; + Cbo.BeginArray("Values"sv); + Cbo.BeginObject(); + Cbo << "Id" << Oid{1, 2, 3}; + Cbo.AddAttachment("RawHash", OutputAttachment); + Cbo << "RawSize" << Output.size(); + Cbo.EndObject(); + Cbo.EndArray(); + + Result.SetObject(Cbo.Save()); + Result.AddAttachment(std::move(OutputAttachment)); + return Result; + }; + + std::string_view Function = Action["Function"sv].AsString(); + + if (Function == "Rot13"sv) + { + return Apply(Rot13Function); + } + else if (Function == "Reverse"sv) + { + return Apply(ReverseFunction); + } + else if (Function == "Identity"sv) + { + return Apply(IdentityFunction); + } + else if (Function == "Null"sv) + { + return Apply(NullFunction); + } + else + { + return {}; + } +} + +/* This implements a minimal application to help testing of process launch-related + functionality + + It also mimics the DDC2 worker command line interface, so it may be used to + exercise compute infrastructure. + */ int main(int argc, char* argv[]) { int ExitCode = 0; - for (int i = 0; i < argc; ++i) + try { - if (std::strncmp(argv[i], "-t=", 3) == 0) + std::filesystem::path BasePath = std::filesystem::current_path(); + std::filesystem::path InputPath = std::filesystem::current_path() / "Inputs"; + std::filesystem::path OutputPath = std::filesystem::current_path() / "Outputs"; + std::filesystem::path VersionPath = std::filesystem::current_path() / "Versions"; + std::vector<std::filesystem::path> ActionPaths; + + /* + GetSwitchValues(TEXT("-B="), ActionPathPatterns); + GetSwitchValues(TEXT("-Build="), ActionPathPatterns); + + GetSwitchValues(TEXT("-I="), InputDirectoryPaths); + GetSwitchValues(TEXT("-Input="), InputDirectoryPaths); + + GetSwitchValues(TEXT("-O="), OutputDirectoryPaths); + GetSwitchValues(TEXT("-Output="), OutputDirectoryPaths); + + GetSwitchValues(TEXT("-V="), VersionPaths); + GetSwitchValues(TEXT("-Version="), VersionPaths); + */ + + auto SplitArg = [](const char* Arg) -> std::string_view { + std::string_view ArgView{Arg}; + if (auto SplitPos = ArgView.find_first_of('='); SplitPos != std::string_view::npos) + { + return ArgView.substr(SplitPos + 1); + } + else + { + return {}; + } + }; + + auto ParseIntArg = [](std::string_view Arg) -> int { + int Rv = 0; + const auto Result = std::from_chars(Arg.data(), Arg.data() + Arg.size(), Rv); + + if (Result.ec != std::errc{}) + { + throw std::invalid_argument(fmt::format("bad argument (not an integer): {}", Arg).c_str()); + } + + return Rv; + }; + + for (int i = 1; i < argc; ++i) + { + std::string_view Arg = argv[i]; + + if (Arg.compare(0, 1, "-")) + { + continue; + } + + if (std::strncmp(argv[i], "-t=", 3) == 0) + { + const int SleepTime = std::atoi(argv[i] + 3); + + printf("[zentest] sleeping for %ds...\n", SleepTime); + + std::this_thread::sleep_for(SleepTime * 1s); + } + else if (std::strncmp(argv[i], "-f=", 3) == 0) + { + // Force a "failure" process exit code to return to the invoker + + // This may throw for invalid arguments, which makes this useful for + // testing exception handling + std::string_view ErrorArg = SplitArg(argv[i]); + ExitCode = ParseIntArg(ErrorArg); + } + else if ((_strnicmp(argv[i], "-input=", 7) == 0) || (_strnicmp(argv[i], "-i=", 3) == 0)) + { + /* mimic DDC2 + + GetSwitchValues(TEXT("-I="), InputDirectoryPaths); + GetSwitchValues(TEXT("-Input="), InputDirectoryPaths); + */ + + std::string_view InputArg = SplitArg(argv[i]); + InputPath = InputArg; + } + else if ((_strnicmp(argv[i], "-output=", 8) == 0) || (_strnicmp(argv[i], "-o=", 3) == 0)) + { + /* mimic DDC2 handling of where files storing output chunk files are directed + + GetSwitchValues(TEXT("-O="), OutputDirectoryPaths); + GetSwitchValues(TEXT("-Output="), OutputDirectoryPaths); + */ + + std::string_view OutputArg = SplitArg(argv[i]); + OutputPath = OutputArg; + } + else if ((_strnicmp(argv[i], "-version=", 8) == 0) || (_strnicmp(argv[i], "-v=", 3) == 0)) + { + /* mimic DDC2 + + GetSwitchValues(TEXT("-V="), VersionPaths); + GetSwitchValues(TEXT("-Version="), VersionPaths); + */ + + std::string_view VersionArg = SplitArg(argv[i]); + VersionPath = VersionArg; + } + else if ((_strnicmp(argv[i], "-build=", 7) == 0) || (_strnicmp(argv[i], "-b=", 3) == 0)) + { + /* mimic DDC2 + + GetSwitchValues(TEXT("-B="), ActionPathPatterns); + GetSwitchValues(TEXT("-Build="), ActionPathPatterns); + */ + + std::string_view BuildActionArg = SplitArg(argv[i]); + std::filesystem::path ActionPath{BuildActionArg}; + ActionPaths.push_back(ActionPath); + + ExitCode = 0; + } + } + + // Emit version information + + if (!VersionPath.empty()) { - const int SleepTime = std::atoi(argv[i] + 3); + CbObjectWriter Version; + + Version << "BuildSystemVersion" << Guid::FromString("17fe280d-ccd8-4be8-a9d1-89c944a70969"sv); + + Version.BeginArray("Functions"); + + Version.BeginObject(); + Version << "Name" + << "Rot13" + << "Version" << Guid::FromString("13131313-1313-1313-1313-131313131313"sv); + Version.EndObject(); - printf("[zentest] sleeping for %ds...\n", SleepTime); + Version.BeginObject(); + Version << "Name" + << "Reverse" + << "Version" << Guid::FromString("98765432-1000-0000-0000-000000000000"sv); + Version.EndObject(); - std::this_thread::sleep_for(SleepTime * 1s); + Version.BeginObject(); + Version << "Name" + << "Identity" + << "Version" << Guid::FromString("11111111-1111-1111-1111-111111111111"sv); + Version.EndObject(); + + Version.BeginObject(); + Version << "Name" + << "Null" + << "Version" << Guid::FromString("00000000-0000-0000-0000-000000000000"sv); + Version.EndObject(); + + Version.EndArray(); + CbObject VersionObject = Version.Save(); + + BinaryWriter Writer; + zen::SaveCompactBinary(Writer, VersionObject); + zen::WriteFile(VersionPath, IoBufferBuilder::MakeFromMemory(Writer.GetView())); } - else if (std::strncmp(argv[i], "-f=", 3) == 0) + + // Evaluate actions + + ContentResolver Resolver; + Resolver.InputsRoot = InputPath; + + for (std::filesystem::path ActionPath : ActionPaths) { - ExitCode = std::atoi(argv[i] + 3); + IoBuffer ActionDescBuffer = ReadFile(ActionPath).Flatten(); + CbObject ActionDesc = LoadCompactBinaryObject(ActionDescBuffer); + CbPackage Result = ExecuteFunction(ActionDesc, Resolver); + CbObject ResultObject = Result.GetObject(); + + BinaryWriter Writer; + zen::SaveCompactBinary(Writer, ResultObject); + zen::WriteFile(ActionPath.replace_extension(".output"), IoBufferBuilder::MakeFromMemory(Writer.GetView())); + + // Also marshal outputs + + for (const auto& Attachment : Result.GetAttachments()) + { + const CompositeBuffer& AttachmentBuffer = Attachment.AsCompressedBinary().GetCompressed(); + zen::WriteFile(OutputPath / Attachment.GetHash().ToHexString(), AttachmentBuffer.Flatten().AsIoBuffer()); + } } } + catch (std::exception& Ex) + { + printf("[zentest] exception caught in main: '%s'\n", Ex.what()); + + ExitCode = 99; + } printf("[zentest] exiting with exit code: %d\n", ExitCode); diff --git a/thirdparty/xmake.lua b/thirdparty/xmake.lua index f079d803d..07605a016 100644 --- a/thirdparty/xmake.lua +++ b/thirdparty/xmake.lua @@ -86,7 +86,7 @@ target("blake3") if is_os("windows") then add_cflags("/experimental:c11atomics") - add_cflags("/wd4245") -- conversion from 'type1' to 'type2', possible loss of data + add_cflags("/wd4245", {force = true}) -- conversion from 'type1' to 'type2', possible loss of data elseif is_os("macosx") then add_cflags("-Wno-unused-function") end @@ -120,6 +120,9 @@ if has_config("zensentry") and not use_asan then add_requires("sentry-native 0.12.1", {configs = {backend = "crashpad"}}) end end + +enable_unity = false + --add_rules("c++.unity_build") if is_mode("release") then @@ -240,6 +243,14 @@ else add_defines("ZEN_WITH_HTTPSYS=0") end +option("zencompute") + set_default(false) + set_showmenu(true) + set_description("Enable compute services endpoint") +option_end() +add_define_by_config("ZEN_WITH_COMPUTE_SERVICES", "zencompute") + + if is_os("windows") then add_defines("UE_MEMORY_TRACE_AVAILABLE=1") option("zenmemtrack") @@ -272,6 +283,7 @@ includes("src/zencore", "src/zencore-test") includes("src/zenhttp", "src/zenhttp-test") includes("src/zennet", "src/zennet-test") includes("src/zenremotestore", "src/zenremotestore-test") +includes("src/zencompute", "src/zencompute-test") includes("src/zenstore", "src/zenstore-test") includes("src/zentelemetry", "src/zentelemetry-test") includes("src/zenutil", "src/zenutil-test") |