aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute
diff options
context:
space:
mode:
authorzousar <[email protected]>2026-02-18 23:19:14 -0700
committerzousar <[email protected]>2026-02-18 23:19:14 -0700
commit2ba28acaf034722452f82cfb07afc0a4bb90eeab (patch)
treec00dea385597180673be6e02aca6c07d9ef6ec00 /src/zencompute
parentupdatefrontend (diff)
parentstructured compute basics (#714) (diff)
downloadzen-2ba28acaf034722452f82cfb07afc0a4bb90eeab.tar.xz
zen-2ba28acaf034722452f82cfb07afc0a4bb90eeab.zip
Merge branch 'main' into zs/web-ui-improvements
Diffstat (limited to 'src/zencompute')
-rw-r--r--src/zencompute/actionrecorder.cpp258
-rw-r--r--src/zencompute/actionrecorder.h91
-rw-r--r--src/zencompute/functionrunner.cpp112
-rw-r--r--src/zencompute/functionrunner.h207
-rw-r--r--src/zencompute/functionservice.cpp957
-rw-r--r--src/zencompute/httpfunctionservice.cpp709
-rw-r--r--src/zencompute/httporchestrator.cpp81
-rw-r--r--src/zencompute/include/zencompute/functionservice.h132
-rw-r--r--src/zencompute/include/zencompute/httpfunctionservice.h73
-rw-r--r--src/zencompute/include/zencompute/httporchestrator.h44
-rw-r--r--src/zencompute/include/zencompute/recordingreader.h127
-rw-r--r--src/zencompute/include/zencompute/zencompute.h11
-rw-r--r--src/zencompute/localrunner.cpp722
-rw-r--r--src/zencompute/localrunner.h100
-rw-r--r--src/zencompute/recordingreader.cpp335
-rw-r--r--src/zencompute/remotehttprunner.cpp457
-rw-r--r--src/zencompute/remotehttprunner.h80
-rw-r--r--src/zencompute/xmake.lua11
-rw-r--r--src/zencompute/zencompute.cpp12
19 files changed, 4519 insertions, 0 deletions
diff --git a/src/zencompute/actionrecorder.cpp b/src/zencompute/actionrecorder.cpp
new file mode 100644
index 000000000..04c4b5141
--- /dev/null
+++ b/src/zencompute/actionrecorder.cpp
@@ -0,0 +1,258 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "actionrecorder.h"
+
+#include "functionrunner.h"
+
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinaryfile.h>
+#include <zencore/compactbinaryvalue.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+
+#if ZEN_PLATFORM_WINDOWS
+# include <ppl.h>
+# define ZEN_CONCRT_AVAILABLE 1
+#else
+# define ZEN_CONCRT_AVAILABLE 0
+#endif
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+namespace zen::compute {
+
+using namespace std::literals;
+
+//////////////////////////////////////////////////////////////////////////
+
+RecordingFileWriter::RecordingFileWriter()
+{
+}
+
+RecordingFileWriter::~RecordingFileWriter()
+{
+ Close();
+}
+
+void
+RecordingFileWriter::Open(std::filesystem::path FilePath)
+{
+ using namespace std::literals;
+
+ m_File.Open(FilePath, BasicFile::Mode::kTruncate);
+ m_File.Write("----DDC2----DATA", 16, 0);
+ m_FileOffset = 16;
+
+ std::filesystem::path TocPath = FilePath.replace_extension(".ztoc");
+ m_TocFile.Open(TocPath, BasicFile::Mode::kTruncate);
+
+ m_TocWriter << "version"sv << 1;
+ m_TocWriter.BeginArray("toc"sv);
+}
+
+void
+RecordingFileWriter::Close()
+{
+ m_TocWriter.EndArray();
+ CbObject Toc = m_TocWriter.Save();
+
+ std::error_code Ec;
+ m_TocFile.WriteAll(Toc.GetBuffer().AsIoBuffer(), Ec);
+}
+
+void
+RecordingFileWriter::AppendObject(const CbObject& Object, const IoHash& ObjectHash)
+{
+ RwLock::ExclusiveLockScope _(m_FileLock);
+
+ MemoryView ObjectView = Object.GetBuffer().GetView();
+
+ std::error_code Ec;
+ m_File.Write(ObjectView, m_FileOffset, Ec);
+
+ if (Ec)
+ {
+ throw std::system_error(Ec, "failed writing to archive");
+ }
+
+ m_TocWriter.BeginArray();
+ m_TocWriter.AddHash(ObjectHash);
+ m_TocWriter.AddInteger(m_FileOffset);
+ m_TocWriter.AddInteger(gsl::narrow<int>(ObjectView.GetSize()));
+ m_TocWriter.EndArray();
+
+ m_FileOffset += ObjectView.GetSize();
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+ActionRecorder::ActionRecorder(ChunkResolver& InChunkResolver, const std::filesystem::path& RecordingLogPath)
+: m_ChunkResolver(InChunkResolver)
+, m_RecordingLogDir(RecordingLogPath)
+{
+ std::error_code Ec;
+ CreateDirectories(m_RecordingLogDir, Ec);
+
+ if (Ec)
+ {
+ ZEN_WARN("Could not create directory '{}': {}", m_RecordingLogDir, Ec.message());
+ }
+
+ CleanDirectory(m_RecordingLogDir, /* ForceRemoveReadOnlyFiles */ true, Ec);
+
+ if (Ec)
+ {
+ ZEN_WARN("Could not clean directory '{}': {}", m_RecordingLogDir, Ec.message());
+ }
+
+ m_WorkersFile.Open(m_RecordingLogDir / "workers.zdat");
+ m_ActionsFile.Open(m_RecordingLogDir / "actions.zdat");
+
+ CidStoreConfiguration CidConfig;
+ CidConfig.RootDirectory = m_RecordingLogDir / "cid";
+ CidConfig.HugeValueThreshold = 128 * 1024 * 1024;
+
+ m_CidStore.Initialize(CidConfig);
+}
+
+ActionRecorder::~ActionRecorder()
+{
+ Shutdown();
+}
+
+void
+ActionRecorder::Shutdown()
+{
+ m_CidStore.Flush();
+}
+
+void
+ActionRecorder::RegisterWorker(const CbPackage& WorkerPackage)
+{
+ const IoHash WorkerId = WorkerPackage.GetObjectHash();
+
+ m_WorkersFile.AppendObject(WorkerPackage.GetObject(), WorkerId);
+
+ std::unordered_set<IoHash> AddedChunks;
+ uint64_t AddedBytes = 0;
+
+ // First add all attachments from the worker package itself
+
+ for (const CbAttachment& Attachment : WorkerPackage.GetAttachments())
+ {
+ CompressedBuffer Buffer = Attachment.AsCompressedBinary();
+ IoBuffer Data = Buffer.GetCompressed().Flatten().AsIoBuffer();
+
+ const IoHash ChunkHash = Buffer.DecodeRawHash();
+
+ CidStore::InsertResult Result = m_CidStore.AddChunk(Data, ChunkHash, CidStore::InsertMode::kCopyOnly);
+
+ AddedChunks.insert(ChunkHash);
+
+ if (Result.New)
+ {
+ AddedBytes += Data.GetSize();
+ }
+ }
+
+ // Not all attachments will be present in the worker package, so we need to add
+ // all referenced chunks to ensure that the recording is self-contained and not
+ // referencing data in the main CID store
+
+ CbObject WorkerDescriptor = WorkerPackage.GetObject();
+
+ WorkerDescriptor.IterateAttachments([&](const CbFieldView AttachmentField) {
+ const IoHash AttachmentCid = AttachmentField.GetValue().AsHash();
+
+ if (!AddedChunks.contains(AttachmentCid))
+ {
+ IoBuffer AttachmentData = m_ChunkResolver.FindChunkByCid(AttachmentCid);
+
+ if (AttachmentData)
+ {
+ CidStore::InsertResult Result = m_CidStore.AddChunk(AttachmentData, AttachmentCid, CidStore::InsertMode::kCopyOnly);
+
+ if (Result.New)
+ {
+ AddedBytes += AttachmentData.GetSize();
+ }
+ }
+ else
+ {
+ ZEN_WARN("RegisterWorker: could not resolve attachment chunk {} for worker {}", AttachmentCid, WorkerId);
+ }
+
+ AddedChunks.insert(AttachmentCid);
+ }
+ });
+
+ ZEN_INFO("recorded worker {} with {} attachments ({} bytes)", WorkerId, AddedChunks.size(), AddedBytes);
+}
+
+bool
+ActionRecorder::RecordAction(Ref<RunnerAction> Action)
+{
+ bool AllGood = true;
+
+ Action->ActionObj.IterateAttachments([&](CbFieldView Field) {
+ IoHash AttachData = Field.AsHash();
+ IoBuffer ChunkData = m_ChunkResolver.FindChunkByCid(AttachData);
+
+ if (ChunkData)
+ {
+ if (ChunkData.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ IoHash DecompressedHash;
+ uint64_t RawSize = 0;
+ CompressedBuffer Compressed =
+ CompressedBuffer::FromCompressed(SharedBuffer(ChunkData), /* out */ DecompressedHash, /* out*/ RawSize);
+
+ OodleCompressor Compressor;
+ OodleCompressionLevel CompressionLevel;
+ uint64_t BlockSize = 0;
+ if (Compressed.TryGetCompressParameters(/* out */ Compressor, /* out */ CompressionLevel, /* out */ BlockSize))
+ {
+ if (Compressor == OodleCompressor::NotSet)
+ {
+ CompositeBuffer Decompressed = Compressed.DecompressToComposite();
+ CompressedBuffer NewCompressed = CompressedBuffer::Compress(std::move(Decompressed),
+ OodleCompressor::Mermaid,
+ OodleCompressionLevel::Fast,
+ BlockSize);
+
+ ChunkData = NewCompressed.GetCompressed().Flatten().AsIoBuffer();
+ }
+ }
+ }
+
+ const uint64_t ChunkSize = ChunkData.GetSize();
+
+ m_CidStore.AddChunk(ChunkData, AttachData, CidStore::InsertMode::kCopyOnly);
+ ++m_ChunkCounter;
+ m_ChunkBytesCounter.fetch_add(ChunkSize);
+ }
+ else
+ {
+ AllGood = false;
+
+ ZEN_WARN("could not resolve chunk {}", AttachData);
+ }
+ });
+
+ if (AllGood)
+ {
+ m_ActionsFile.AppendObject(Action->ActionObj, Action->ActionId);
+ ++m_ActionsCounter;
+
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+}
+
+} // namespace zen::compute
+
+#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/src/zencompute/actionrecorder.h b/src/zencompute/actionrecorder.h
new file mode 100644
index 000000000..9cc2b44a2
--- /dev/null
+++ b/src/zencompute/actionrecorder.h
@@ -0,0 +1,91 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencompute/functionservice.h>
+#include <zencompute/zencompute.h>
+#include <zencore/basicfile.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zenstore/cidstore.h>
+#include <zenstore/gc.h>
+#include <zenstore/zenstore.h>
+
+#include <filesystem>
+#include <functional>
+#include <map>
+#include <unordered_map>
+
+namespace zen {
+class CbObject;
+class CbPackage;
+struct IoHash;
+} // namespace zen
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+namespace zen::compute {
+
+//////////////////////////////////////////////////////////////////////////
+
+struct RecordingFileWriter
+{
+ RecordingFileWriter(RecordingFileWriter&&) = delete;
+ RecordingFileWriter& operator=(RecordingFileWriter&&) = delete;
+
+ RwLock m_FileLock;
+ BasicFile m_File;
+ uint64_t m_FileOffset = 0;
+ CbObjectWriter m_TocWriter;
+ BasicFile m_TocFile;
+
+ RecordingFileWriter();
+ ~RecordingFileWriter();
+
+ void Open(std::filesystem::path FilePath);
+ void Close();
+ void AppendObject(const CbObject& Object, const IoHash& ObjectHash);
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+/**
+ * Recording "runner" implementation
+ *
+ * This class writes out all actions and their attachments to a recording directory
+ * in a format that can be read back by the RecordingReader.
+ *
+ * The contents of the recording directory will be self-contained, with all referenced
+ * attachments stored in the recording directory itself, so that the recording can be
+ * moved or shared without needing to maintain references to the main CID store.
+ *
+ */
+
+class ActionRecorder
+{
+public:
+ ActionRecorder(ChunkResolver& InChunkResolver, const std::filesystem::path& RecordingLogPath);
+ ~ActionRecorder();
+
+ ActionRecorder(const ActionRecorder&) = delete;
+ ActionRecorder& operator=(const ActionRecorder&) = delete;
+
+ void Shutdown();
+ void RegisterWorker(const CbPackage& WorkerPackage);
+ bool RecordAction(Ref<RunnerAction> Action);
+
+private:
+ ChunkResolver& m_ChunkResolver;
+ std::filesystem::path m_RecordingLogDir;
+
+ RecordingFileWriter m_WorkersFile;
+ RecordingFileWriter m_ActionsFile;
+ GcManager m_Gc;
+ CidStore m_CidStore{m_Gc};
+ std::atomic<int> m_ChunkCounter{0};
+ std::atomic<uint64_t> m_ChunkBytesCounter{0};
+ std::atomic<int> m_ActionsCounter{0};
+};
+
+} // namespace zen::compute
+
+#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/src/zencompute/functionrunner.cpp b/src/zencompute/functionrunner.cpp
new file mode 100644
index 000000000..8e7c12b2b
--- /dev/null
+++ b/src/zencompute/functionrunner.cpp
@@ -0,0 +1,112 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "functionrunner.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include <zencore/compactbinary.h>
+# include <zencore/filesystem.h>
+
+# include <fmt/format.h>
+# include <vector>
+
+namespace zen::compute {
+
+FunctionRunner::FunctionRunner(std::filesystem::path BasePath) : m_ActionsPath(BasePath / "actions")
+{
+}
+
+FunctionRunner::~FunctionRunner() = default;
+
+size_t
+FunctionRunner::QueryCapacity()
+{
+ return 1;
+}
+
+std::vector<SubmitResult>
+FunctionRunner::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
+{
+ std::vector<SubmitResult> Results;
+ Results.reserve(Actions.size());
+
+ for (const Ref<RunnerAction>& Action : Actions)
+ {
+ Results.push_back(SubmitAction(Action));
+ }
+
+ return Results;
+}
+
+void
+FunctionRunner::MaybeDumpAction(int ActionLsn, const CbObject& ActionObject)
+{
+ if (m_DumpActions)
+ {
+ std::string UniqueId = fmt::format("{}.ddb", ActionLsn);
+ std::filesystem::path Path = m_ActionsPath / UniqueId;
+
+ zen::WriteFile(Path, IoBuffer(ActionObject.GetBuffer().AsIoBuffer()));
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+RunnerAction::RunnerAction(FunctionServiceSession* OwnerSession) : m_OwnerSession(OwnerSession)
+{
+ this->Timestamps[static_cast<int>(State::New)] = DateTime::Now().GetTicks();
+}
+
+RunnerAction::~RunnerAction()
+{
+}
+
+void
+RunnerAction::SetActionState(State NewState)
+{
+ ZEN_ASSERT(NewState < State::_Count);
+ this->Timestamps[static_cast<int>(NewState)] = DateTime::Now().GetTicks();
+
+ do
+ {
+ if (State CurrentState = m_ActionState.load(); CurrentState == NewState)
+ {
+ // No state change
+ return;
+ }
+ else
+ {
+ if (NewState <= CurrentState)
+ {
+ // Cannot transition to an earlier or same state
+ return;
+ }
+
+ if (m_ActionState.compare_exchange_strong(CurrentState, NewState))
+ {
+ // Successful state change
+
+ m_OwnerSession->PostUpdate(this);
+
+ return;
+ }
+ }
+ } while (true);
+}
+
+void
+RunnerAction::SetResult(CbPackage&& Result)
+{
+ m_Result = std::move(Result);
+}
+
+CbPackage&
+RunnerAction::GetResult()
+{
+ ZEN_ASSERT(IsCompleted());
+ return m_Result;
+}
+
+} // namespace zen::compute
+
+#endif // ZEN_WITH_COMPUTE_SERVICES \ No newline at end of file
diff --git a/src/zencompute/functionrunner.h b/src/zencompute/functionrunner.h
new file mode 100644
index 000000000..6fd0d84cc
--- /dev/null
+++ b/src/zencompute/functionrunner.h
@@ -0,0 +1,207 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencompute/functionservice.h>
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include <filesystem>
+# include <vector>
+
+namespace zen::compute {
+
+struct SubmitResult
+{
+ bool IsAccepted = false;
+ std::string Reason;
+};
+
+/** Base interface for classes implementing a remote execution "runner"
+ */
+class FunctionRunner : public RefCounted
+{
+ FunctionRunner(FunctionRunner&&) = delete;
+ FunctionRunner& operator=(FunctionRunner&&) = delete;
+
+public:
+ FunctionRunner(std::filesystem::path BasePath);
+ virtual ~FunctionRunner() = 0;
+
+ virtual void Shutdown() = 0;
+ virtual void RegisterWorker(const CbPackage& WorkerPackage) = 0;
+
+ [[nodiscard]] virtual SubmitResult SubmitAction(Ref<RunnerAction> Action) = 0;
+ [[nodiscard]] virtual size_t GetSubmittedActionCount() = 0;
+ [[nodiscard]] virtual bool IsHealthy() = 0;
+ [[nodiscard]] virtual size_t QueryCapacity();
+ [[nodiscard]] virtual std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions);
+
+protected:
+ std::filesystem::path m_ActionsPath;
+ bool m_DumpActions = false;
+ void MaybeDumpAction(int ActionLsn, const CbObject& ActionObject);
+};
+
+template<typename RunnerType>
+struct RunnerGroup
+{
+ void AddRunner(RunnerType* Runner)
+ {
+ m_RunnersLock.WithExclusiveLock([&] { m_Runners.emplace_back(Runner); });
+ }
+ size_t QueryCapacity()
+ {
+ size_t TotalCapacity = 0;
+ m_RunnersLock.WithSharedLock([&] {
+ for (const auto& Runner : m_Runners)
+ {
+ TotalCapacity += Runner->QueryCapacity();
+ }
+ });
+ return TotalCapacity;
+ }
+
+ SubmitResult SubmitAction(Ref<RunnerAction> Action)
+ {
+ RwLock::SharedLockScope _(m_RunnersLock);
+
+ const int InitialIndex = m_NextSubmitIndex.load(std::memory_order_acquire);
+ int Index = InitialIndex;
+ const int RunnerCount = gsl::narrow<int>(m_Runners.size());
+
+ if (RunnerCount == 0)
+ {
+ return {.IsAccepted = false, .Reason = "No runners available"};
+ }
+
+ do
+ {
+ while (Index >= RunnerCount)
+ {
+ Index -= RunnerCount;
+ }
+
+ auto& Runner = m_Runners[Index++];
+
+ SubmitResult Result = Runner->SubmitAction(Action);
+
+ if (Result.IsAccepted == true)
+ {
+ m_NextSubmitIndex = Index % RunnerCount;
+
+ return Result;
+ }
+
+ while (Index >= RunnerCount)
+ {
+ Index -= RunnerCount;
+ }
+ } while (Index != InitialIndex);
+
+ return {.IsAccepted = false};
+ }
+
+ size_t GetSubmittedActionCount()
+ {
+ RwLock::SharedLockScope _(m_RunnersLock);
+
+ size_t TotalCount = 0;
+
+ for (const auto& Runner : m_Runners)
+ {
+ TotalCount += Runner->GetSubmittedActionCount();
+ }
+
+ return TotalCount;
+ }
+
+ void RegisterWorker(CbPackage Worker)
+ {
+ RwLock::SharedLockScope _(m_RunnersLock);
+
+ for (auto& Runner : m_Runners)
+ {
+ Runner->RegisterWorker(Worker);
+ }
+ }
+
+ void Shutdown()
+ {
+ RwLock::SharedLockScope _(m_RunnersLock);
+
+ for (auto& Runner : m_Runners)
+ {
+ Runner->Shutdown();
+ }
+ }
+
+private:
+ RwLock m_RunnersLock;
+ std::vector<Ref<RunnerType>> m_Runners;
+ std::atomic<int> m_NextSubmitIndex{0};
+};
+
+/**
+ * This represents an action going through different stages of scheduling and execution.
+ */
+struct RunnerAction : public RefCounted
+{
+ explicit RunnerAction(FunctionServiceSession* OwnerSession);
+ ~RunnerAction();
+
+ int ActionLsn = 0;
+ WorkerDesc Worker;
+ IoHash ActionId;
+ CbObject ActionObj;
+ int Priority = 0;
+
+ enum class State
+ {
+ New,
+ Pending,
+ Running,
+ Completed,
+ Failed,
+ _Count
+ };
+
+ static const char* ToString(State _)
+ {
+ switch (_)
+ {
+ case State::New:
+ return "New";
+ case State::Pending:
+ return "Pending";
+ case State::Running:
+ return "Running";
+ case State::Completed:
+ return "Completed";
+ case State::Failed:
+ return "Failed";
+ default:
+ return "Unknown";
+ }
+ }
+
+ uint64_t Timestamps[static_cast<int>(State::_Count)] = {};
+
+ State ActionState() const { return m_ActionState; }
+ void SetActionState(State NewState);
+
+ bool IsSuccess() const { return ActionState() == State::Completed; }
+ bool IsCompleted() const { return ActionState() == State::Completed || ActionState() == State::Failed; }
+
+ void SetResult(CbPackage&& Result);
+ CbPackage& GetResult();
+
+private:
+ std::atomic<State> m_ActionState = State::New;
+ FunctionServiceSession* m_OwnerSession = nullptr;
+ CbPackage m_Result;
+};
+
+} // namespace zen::compute
+
+#endif // ZEN_WITH_COMPUTE_SERVICES \ No newline at end of file
diff --git a/src/zencompute/functionservice.cpp b/src/zencompute/functionservice.cpp
new file mode 100644
index 000000000..0698449e9
--- /dev/null
+++ b/src/zencompute/functionservice.cpp
@@ -0,0 +1,957 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "zencompute/functionservice.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include "functionrunner.h"
+# include "actionrecorder.h"
+# include "localrunner.h"
+# include "remotehttprunner.h"
+
+# include <zencompute/recordingreader.h>
+# include <zencore/compactbinary.h>
+# include <zencore/compactbinarybuilder.h>
+# include <zencore/compactbinarypackage.h>
+# include <zencore/compress.h>
+# include <zencore/except.h>
+# include <zencore/filesystem.h>
+# include <zencore/fmtutils.h>
+# include <zencore/iobuffer.h>
+# include <zencore/iohash.h>
+# include <zencore/logging.h>
+# include <zencore/scopeguard.h>
+# include <zentelemetry/stats.h>
+
+# include <set>
+# include <deque>
+# include <map>
+# include <thread>
+# include <unordered_map>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+# include <EASTL/hash_set.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+using namespace std::literals;
+
+namespace zen::compute {
+
+//////////////////////////////////////////////////////////////////////////
+
+struct FunctionServiceSession::Impl
+{
+ FunctionServiceSession* m_FunctionServiceSession;
+ ChunkResolver& m_ChunkResolver;
+ LoggerRef m_Log{logging::Get("apply")};
+
+ Impl(FunctionServiceSession* InFunctionServiceSession, ChunkResolver& InChunkResolver)
+ : m_FunctionServiceSession(InFunctionServiceSession)
+ , m_ChunkResolver(InChunkResolver)
+ {
+ m_SchedulingThread = std::thread{&Impl::MonitorThreadFunction, this};
+ }
+
+ void Shutdown();
+ bool IsHealthy();
+
+ LoggerRef Log() { return m_Log; }
+
+ std::atomic_bool m_AcceptActions = true;
+
+ struct FunctionDefinition
+ {
+ std::string FunctionName;
+ Guid FunctionVersion;
+ Guid BuildSystemVersion;
+ IoHash WorkerId;
+ };
+
+ void EmitStats(CbObjectWriter& Cbo)
+ {
+ m_WorkerLock.WithSharedLock([&] { Cbo << "worker_count"sv << m_WorkerMap.size(); });
+ m_ResultsLock.WithSharedLock([&] { Cbo << "actions_complete"sv << m_ResultsMap.size(); });
+ m_PendingLock.WithSharedLock([&] { Cbo << "actions_pending"sv << m_PendingActions.size(); });
+ Cbo << "actions_submitted"sv << GetSubmittedActionCount();
+ EmitSnapshot("actions_retired"sv, m_ResultRate, Cbo);
+ }
+
+ void RegisterWorker(CbPackage Worker);
+ WorkerDesc GetWorkerDescriptor(const IoHash& WorkerId);
+
+ std::atomic<int32_t> m_ActionsCounter = 0; // sequence number
+
+ RwLock m_PendingLock;
+ std::map<int, Ref<RunnerAction>> m_PendingActions;
+
+ RwLock m_RunningLock;
+ std::unordered_map<int, Ref<RunnerAction>> m_RunningMap;
+
+ RwLock m_ResultsLock;
+ std::unordered_map<int, Ref<RunnerAction>> m_ResultsMap;
+ metrics::Meter m_ResultRate;
+ std::atomic<uint64_t> m_RetiredCount{0};
+
+ HttpResponseCode GetActionResult(int ActionLsn, CbPackage& OutResultPackage);
+ HttpResponseCode FindActionResult(const IoHash& ActionId, CbPackage& ResultPackage);
+
+ std::atomic<bool> m_ShutdownRequested{false};
+
+ std::thread m_SchedulingThread;
+ std::atomic<bool> m_SchedulingThreadEnabled{true};
+ Event m_SchedulingThreadEvent;
+
+ void MonitorThreadFunction();
+ void SchedulePendingActions();
+
+ // Workers
+
+ RwLock m_WorkerLock;
+ std::unordered_map<IoHash, CbPackage> m_WorkerMap;
+ std::vector<FunctionDefinition> m_FunctionList;
+ std::vector<IoHash> GetKnownWorkerIds();
+
+ // Runners
+
+ RunnerGroup<LocalProcessRunner> m_LocalRunnerGroup;
+ RunnerGroup<RemoteHttpRunner> m_RemoteRunnerGroup;
+
+ EnqueueResult EnqueueAction(CbObject ActionObject, int Priority);
+ EnqueueResult EnqueueResolvedAction(WorkerDesc Worker, CbObject ActionObj, int RequestPriority);
+
+ void GetCompleted(CbWriter& Cbo);
+
+ // Recording
+
+ void StartRecording(ChunkResolver& InCidStore, const std::filesystem::path& RecordingPath);
+ void StopRecording();
+
+ std::unique_ptr<ActionRecorder> m_Recorder;
+
+ // History tracking
+
+ RwLock m_ActionHistoryLock;
+ std::deque<FunctionServiceSession::ActionHistoryEntry> m_ActionHistory;
+ size_t m_HistoryLimit = 1000;
+
+ std::vector<FunctionServiceSession::ActionHistoryEntry> GetActionHistory(int Limit);
+
+ //
+
+ [[nodiscard]] size_t QueryCapacity();
+
+ [[nodiscard]] SubmitResult SubmitAction(Ref<RunnerAction> Action);
+ [[nodiscard]] std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions);
+ [[nodiscard]] size_t GetSubmittedActionCount();
+
+ // Updates
+
+ RwLock m_UpdatedActionsLock;
+ std::vector<Ref<RunnerAction>> m_UpdatedActions;
+
+ void HandleActionUpdates();
+ void PostUpdate(RunnerAction* Action);
+
+ void ShutdownRunners();
+};
+
+bool
+FunctionServiceSession::Impl::IsHealthy()
+{
+ return true;
+}
+
+void
+FunctionServiceSession::Impl::Shutdown()
+{
+ m_AcceptActions = false;
+ m_ShutdownRequested = true;
+
+ m_SchedulingThreadEnabled = false;
+ m_SchedulingThreadEvent.Set();
+ if (m_SchedulingThread.joinable())
+ {
+ m_SchedulingThread.join();
+ }
+
+ ShutdownRunners();
+}
+
+void
+FunctionServiceSession::Impl::ShutdownRunners()
+{
+ m_LocalRunnerGroup.Shutdown();
+ m_RemoteRunnerGroup.Shutdown();
+}
+
+void
+FunctionServiceSession::Impl::StartRecording(ChunkResolver& InCidStore, const std::filesystem::path& RecordingPath)
+{
+ ZEN_INFO("starting recording to '{}'", RecordingPath);
+
+ m_Recorder = std::make_unique<ActionRecorder>(InCidStore, RecordingPath);
+
+ ZEN_INFO("started recording to '{}'", RecordingPath);
+}
+
+void
+FunctionServiceSession::Impl::StopRecording()
+{
+ ZEN_INFO("stopping recording");
+
+ m_Recorder = nullptr;
+
+ ZEN_INFO("stopped recording");
+}
+
+std::vector<FunctionServiceSession::ActionHistoryEntry>
+FunctionServiceSession::Impl::GetActionHistory(int Limit)
+{
+ RwLock::SharedLockScope _(m_ActionHistoryLock);
+
+ if (Limit > 0 && static_cast<size_t>(Limit) < m_ActionHistory.size())
+ {
+ return std::vector<ActionHistoryEntry>(m_ActionHistory.end() - Limit, m_ActionHistory.end());
+ }
+
+ return std::vector<ActionHistoryEntry>(m_ActionHistory.begin(), m_ActionHistory.end());
+}
+
+void
+FunctionServiceSession::Impl::RegisterWorker(CbPackage Worker)
+{
+ RwLock::ExclusiveLockScope _(m_WorkerLock);
+
+ const IoHash& WorkerId = Worker.GetObject().GetHash();
+
+ if (m_WorkerMap.insert_or_assign(WorkerId, Worker).second)
+ {
+ // Note that since the convention currently is that WorkerId is equal to the hash
+ // of the worker descriptor there is no chance that we get a second write with a
+ // different descriptor. Thus we only need to call this the first time, when the
+ // worker is added
+
+ m_LocalRunnerGroup.RegisterWorker(Worker);
+ m_RemoteRunnerGroup.RegisterWorker(Worker);
+
+ if (m_Recorder)
+ {
+ m_Recorder->RegisterWorker(Worker);
+ }
+
+ CbObject WorkerObj = Worker.GetObject();
+
+ // Populate worker database
+
+ const Guid WorkerBuildSystemVersion = WorkerObj["buildsystem_version"sv].AsUuid();
+
+ for (auto& Item : WorkerObj["functions"sv])
+ {
+ CbObjectView Function = Item.AsObjectView();
+
+ std::string_view FunctionName = Function["name"sv].AsString();
+ const Guid FunctionVersion = Function["version"sv].AsUuid();
+
+ m_FunctionList.emplace_back(FunctionDefinition{.FunctionName = std::string{FunctionName},
+ .FunctionVersion = FunctionVersion,
+ .BuildSystemVersion = WorkerBuildSystemVersion,
+ .WorkerId = WorkerId});
+ }
+ }
+}
+
+WorkerDesc
+FunctionServiceSession::Impl::GetWorkerDescriptor(const IoHash& WorkerId)
+{
+ RwLock::SharedLockScope _(m_WorkerLock);
+
+ if (auto It = m_WorkerMap.find(WorkerId); It != m_WorkerMap.end())
+ {
+ const CbPackage& Desc = It->second;
+ return {Desc, WorkerId};
+ }
+
+ return {};
+}
+
+std::vector<IoHash>
+FunctionServiceSession::Impl::GetKnownWorkerIds()
+{
+ std::vector<IoHash> WorkerIds;
+ WorkerIds.reserve(m_WorkerMap.size());
+
+ m_WorkerLock.WithSharedLock([&] {
+ for (const auto& [WorkerId, _] : m_WorkerMap)
+ {
+ WorkerIds.push_back(WorkerId);
+ }
+ });
+
+ return WorkerIds;
+}
+
+FunctionServiceSession::EnqueueResult
+FunctionServiceSession::Impl::EnqueueAction(CbObject ActionObject, int Priority)
+{
+ // Resolve function to worker
+
+ IoHash WorkerId{IoHash::Zero};
+
+ std::string_view FunctionName = ActionObject["Function"sv].AsString();
+ const Guid FunctionVersion = ActionObject["FunctionVersion"sv].AsUuid();
+ const Guid BuildSystemVersion = ActionObject["BuildSystemVersion"sv].AsUuid();
+
+ for (const FunctionDefinition& FuncDef : m_FunctionList)
+ {
+ if (FuncDef.FunctionName == FunctionName && FuncDef.FunctionVersion == FunctionVersion &&
+ FuncDef.BuildSystemVersion == BuildSystemVersion)
+ {
+ WorkerId = FuncDef.WorkerId;
+
+ break;
+ }
+ }
+
+ if (WorkerId == IoHash::Zero)
+ {
+ CbObjectWriter Writer;
+
+ Writer << "Function"sv << FunctionName << "FunctionVersion"sv << FunctionVersion << "BuildSystemVersion" << BuildSystemVersion;
+ Writer << "error"
+ << "no worker matches the action specification";
+
+ return {0, Writer.Save()};
+ }
+
+ if (auto It = m_WorkerMap.find(WorkerId); It != m_WorkerMap.end())
+ {
+ CbPackage WorkerPackage = It->second;
+
+ return EnqueueResolvedAction(WorkerDesc{WorkerPackage, WorkerId}, ActionObject, Priority);
+ }
+
+ CbObjectWriter Writer;
+
+ Writer << "Function"sv << FunctionName << "FunctionVersion"sv << FunctionVersion << "BuildSystemVersion" << BuildSystemVersion;
+ Writer << "error"
+ << "no worker found despite match";
+
+ return {0, Writer.Save()};
+}
+
+FunctionServiceSession::EnqueueResult
+FunctionServiceSession::Impl::EnqueueResolvedAction(WorkerDesc Worker, CbObject ActionObj, int RequestPriority)
+{
+ const int ActionLsn = ++m_ActionsCounter;
+
+ Ref<RunnerAction> Pending{new RunnerAction(m_FunctionServiceSession)};
+
+ Pending->ActionLsn = ActionLsn;
+ Pending->Worker = Worker;
+ Pending->ActionId = ActionObj.GetHash();
+ Pending->ActionObj = ActionObj;
+ Pending->Priority = RequestPriority;
+
+ SubmitResult SubResult = SubmitAction(Pending);
+
+ if (SubResult.IsAccepted)
+ {
+ // Great, the job is being taken care of by the runner
+ ZEN_DEBUG("direct schedule LSN {}", Pending->ActionLsn);
+ }
+ else
+ {
+ ZEN_DEBUG("action {} ({}) PENDING", Pending->ActionId, Pending->ActionLsn);
+
+ Pending->SetActionState(RunnerAction::State::Pending);
+ }
+
+ if (m_Recorder)
+ {
+ m_Recorder->RecordAction(Pending);
+ }
+
+ CbObjectWriter Writer;
+ Writer << "lsn" << Pending->ActionLsn;
+ Writer << "worker" << Pending->Worker.WorkerId;
+ Writer << "action" << Pending->ActionId;
+
+ return {Pending->ActionLsn, Writer.Save()};
+}
+
+SubmitResult
+FunctionServiceSession::Impl::SubmitAction(Ref<RunnerAction> Action)
+{
+ // Loosely round-robin scheduling of actions across runners.
+ //
+ // It's not entirely clear what this means given that submits
+ // can come in across multiple threads, but it's probably better
+ // than always starting with the first runner.
+ //
+ // Longer term we should track the state of the individual
+ // runners and make decisions accordingly.
+
+ SubmitResult Result = m_LocalRunnerGroup.SubmitAction(Action);
+ if (Result.IsAccepted)
+ {
+ return Result;
+ }
+
+ return m_RemoteRunnerGroup.SubmitAction(Action);
+}
+
+size_t
+FunctionServiceSession::Impl::GetSubmittedActionCount()
+{
+ return m_LocalRunnerGroup.GetSubmittedActionCount() + m_RemoteRunnerGroup.GetSubmittedActionCount();
+}
+
+HttpResponseCode
+FunctionServiceSession::Impl::GetActionResult(int ActionLsn, CbPackage& OutResultPackage)
+{
+ // This lock is held for the duration of the function since we need to
+ // be sure that the action doesn't change state while we are checking the
+ // different data structures
+
+ RwLock::ExclusiveLockScope _(m_ResultsLock);
+
+ if (auto It = m_ResultsMap.find(ActionLsn); It != m_ResultsMap.end())
+ {
+ OutResultPackage = std::move(It->second->GetResult());
+
+ m_ResultsMap.erase(It);
+
+ return HttpResponseCode::OK;
+ }
+
+ {
+ RwLock::SharedLockScope __(m_PendingLock);
+
+ if (auto FindIt = m_PendingActions.find(ActionLsn); FindIt != m_PendingActions.end())
+ {
+ return HttpResponseCode::Accepted;
+ }
+ }
+
+ // Lock order is important here to avoid deadlocks, RwLock m_RunningLock must
+ // always be taken after m_ResultsLock if both are needed
+
+ {
+ RwLock::SharedLockScope __(m_RunningLock);
+
+ if (m_RunningMap.find(ActionLsn) != m_RunningMap.end())
+ {
+ return HttpResponseCode::Accepted;
+ }
+ }
+
+ return HttpResponseCode::NotFound;
+}
+
+HttpResponseCode
+FunctionServiceSession::Impl::FindActionResult(const IoHash& ActionId, CbPackage& OutResultPackage)
+{
+ // This lock is held for the duration of the function since we need to
+ // be sure that the action doesn't change state while we are checking the
+ // different data structures
+
+ RwLock::ExclusiveLockScope _(m_ResultsLock);
+
+ for (auto It = begin(m_ResultsMap), End = end(m_ResultsMap); It != End; ++It)
+ {
+ if (It->second->ActionId == ActionId)
+ {
+ OutResultPackage = std::move(It->second->GetResult());
+
+ m_ResultsMap.erase(It);
+
+ return HttpResponseCode::OK;
+ }
+ }
+
+ {
+ RwLock::SharedLockScope __(m_PendingLock);
+
+ for (const auto& [K, Pending] : m_PendingActions)
+ {
+ if (Pending->ActionId == ActionId)
+ {
+ return HttpResponseCode::Accepted;
+ }
+ }
+ }
+
+ // Lock order is important here to avoid deadlocks, RwLock m_RunningLock must
+ // always be taken after m_ResultsLock if both are needed
+
+ {
+ RwLock::SharedLockScope __(m_RunningLock);
+
+ for (const auto& [K, v] : m_RunningMap)
+ {
+ if (v->ActionId == ActionId)
+ {
+ return HttpResponseCode::Accepted;
+ }
+ }
+ }
+
+ return HttpResponseCode::NotFound;
+}
+
+void
+FunctionServiceSession::Impl::GetCompleted(CbWriter& Cbo)
+{
+ Cbo.BeginArray("completed");
+
+ m_ResultsLock.WithSharedLock([&] {
+ for (auto& Kv : m_ResultsMap)
+ {
+ Cbo << Kv.first;
+ }
+ });
+
+ Cbo.EndArray();
+}
+
+# define ZEN_BATCH_SCHEDULER 1
+
+void
+FunctionServiceSession::Impl::SchedulePendingActions()
+{
+ int ScheduledCount = 0;
+ size_t RunningCount = m_RunningLock.WithSharedLock([&] { return m_RunningMap.size(); });
+ size_t PendingCount = m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); });
+ size_t ResultCount = m_ResultsLock.WithSharedLock([&] { return m_ResultsMap.size(); });
+
+ static Stopwatch DumpRunningTimer;
+
+ auto _ = MakeGuard([&] {
+ ZEN_INFO("scheduled {} pending actions. {} running ({} retired), {} still pending, {} results",
+ ScheduledCount,
+ RunningCount,
+ m_RetiredCount.load(),
+ PendingCount,
+ ResultCount);
+
+ if (DumpRunningTimer.GetElapsedTimeMs() > 30000)
+ {
+ DumpRunningTimer.Reset();
+
+ std::set<int> RunningList;
+ m_RunningLock.WithSharedLock([&] {
+ for (auto& [K, V] : m_RunningMap)
+ {
+ RunningList.insert(K);
+ }
+ });
+
+ ExtendableStringBuilder<1024> RunningString;
+ for (int i : RunningList)
+ {
+ if (RunningString.Size())
+ {
+ RunningString << ", ";
+ }
+
+ RunningString.Append(IntNum(i));
+ }
+
+ ZEN_INFO("running: {}", RunningString);
+ }
+ });
+
+# if ZEN_BATCH_SCHEDULER
+ size_t Capacity = QueryCapacity();
+
+ if (!Capacity)
+ {
+ _.Dismiss();
+
+ return;
+ }
+
+ std::vector<Ref<RunnerAction>> ActionsToSchedule;
+
+ // Pull actions to schedule from the pending queue, we will try to submit these to the runner outside of the lock
+
+ m_PendingLock.WithExclusiveLock([&] {
+ if (m_ShutdownRequested)
+ {
+ return;
+ }
+
+ if (m_PendingActions.empty())
+ {
+ return;
+ }
+
+ size_t NumActionsToSchedule = std::min(Capacity, m_PendingActions.size());
+
+ auto PendingIt = m_PendingActions.begin();
+ const auto PendingEnd = m_PendingActions.end();
+
+ while (NumActionsToSchedule && PendingIt != PendingEnd)
+ {
+ const Ref<RunnerAction>& Pending = PendingIt->second;
+
+ switch (Pending->ActionState())
+ {
+ case RunnerAction::State::Pending:
+ ActionsToSchedule.push_back(Pending);
+ break;
+
+ case RunnerAction::State::Running:
+ case RunnerAction::State::Completed:
+ case RunnerAction::State::Failed:
+ break;
+
+ default:
+ case RunnerAction::State::New:
+ ZEN_WARN("unexpected state {} for pending action {}", static_cast<int>(Pending->ActionState()), Pending->ActionLsn);
+ break;
+ }
+
+ ++PendingIt;
+ --NumActionsToSchedule;
+ }
+
+ PendingCount = m_PendingActions.size();
+ });
+
+ if (ActionsToSchedule.empty())
+ {
+ _.Dismiss();
+ return;
+ }
+
+ ZEN_INFO("attempting schedule of {} pending actions", ActionsToSchedule.size());
+
+ auto SubmitResults = SubmitActions(ActionsToSchedule);
+
+ // Move successfully scheduled actions to the running map and remove
+ // from pending queue. It's actually possible that by the time we get
+ // to this stage some of the actions may have already completed, so
+ // they should not always be added to the running map
+
+ eastl::hash_set<int> ScheduledActions;
+
+ for (size_t i = 0; i < ActionsToSchedule.size(); ++i)
+ {
+ const Ref<RunnerAction>& Pending = ActionsToSchedule[i];
+ const SubmitResult& SubResult = SubmitResults[i];
+
+ if (SubResult.IsAccepted)
+ {
+ ScheduledActions.insert(Pending->ActionLsn);
+ }
+ }
+
+ ScheduledCount += (int)ActionsToSchedule.size();
+
+# else
+ m_PendingLock.WithExclusiveLock([&] {
+ while (!m_PendingActions.empty())
+ {
+ if (m_ShutdownRequested)
+ {
+ return;
+ }
+
+ // Here it would be good if we could decide to pop immediately to avoid
+ // holding the lock while creating processes etc
+ const Ref<RunnerAction>& Pending = m_PendingActions.begin()->second;
+ FunctionRunner::SubmitResult SubResult = SubmitAction(Pending);
+
+ if (SubResult.IsAccepted)
+ {
+ // Great, the job is being taken care of by the runner
+
+ ZEN_DEBUG("action {} ({}) PENDING -> RUNNING", Pending->ActionId, Pending->ActionLsn);
+
+ m_RunningLock.WithExclusiveLock([&] {
+ m_RunningMap.insert({Pending->ActionLsn, Pending});
+
+ RunningCount = m_RunningMap.size();
+ });
+
+ m_PendingActions.pop_front();
+
+ PendingCount = m_PendingActions.size();
+ ++ScheduledCount;
+ }
+ else
+ {
+ // Runner could not accept the job, leave it on the pending queue
+
+ return;
+ }
+ }
+ });
+# endif
+}
+
+void
+FunctionServiceSession::Impl::MonitorThreadFunction()
+{
+ SetCurrentThreadName("FunctionServiceSession_Monitor");
+
+ auto _ = MakeGuard([&] { ZEN_INFO("monitor thread exiting"); });
+
+ do
+ {
+ int TimeoutMs = 1000;
+
+ if (m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); }))
+ {
+ TimeoutMs = 100;
+ }
+
+ const bool Timedout = m_SchedulingThreadEvent.Wait(TimeoutMs);
+
+ if (m_SchedulingThreadEnabled == false)
+ {
+ return;
+ }
+
+ HandleActionUpdates();
+
+ // Schedule pending actions
+
+ SchedulePendingActions();
+
+ if (!Timedout)
+ {
+ m_SchedulingThreadEvent.Reset();
+ }
+ } while (m_SchedulingThreadEnabled);
+}
+
+void
+FunctionServiceSession::Impl::PostUpdate(RunnerAction* Action)
+{
+ m_UpdatedActionsLock.WithExclusiveLock([&] { m_UpdatedActions.emplace_back(Action); });
+}
+
+void
+FunctionServiceSession::Impl::HandleActionUpdates()
+{
+ std::vector<Ref<RunnerAction>> UpdatedActions;
+
+ m_UpdatedActionsLock.WithExclusiveLock([&] { std::swap(UpdatedActions, m_UpdatedActions); });
+
+ std::unordered_set<int> SeenLsn;
+ std::unordered_set<int> RunningLsn;
+
+ for (Ref<RunnerAction>& Action : UpdatedActions)
+ {
+ const int ActionLsn = Action->ActionLsn;
+
+ if (auto [It, Inserted] = SeenLsn.insert(ActionLsn); Inserted)
+ {
+ switch (Action->ActionState())
+ {
+ case RunnerAction::State::Pending:
+ m_PendingLock.WithExclusiveLock([&] { m_PendingActions.insert({ActionLsn, Action}); });
+ break;
+
+ case RunnerAction::State::Running:
+ m_PendingLock.WithExclusiveLock([&] {
+ m_RunningLock.WithExclusiveLock([&] {
+ m_RunningMap.insert({ActionLsn, Action});
+ m_PendingActions.erase(ActionLsn);
+ });
+ });
+ ZEN_DEBUG("action {} ({}) RUNNING", Action->ActionId, ActionLsn);
+ break;
+
+ case RunnerAction::State::Completed:
+ case RunnerAction::State::Failed:
+ m_ResultsLock.WithExclusiveLock([&] {
+ m_ResultsMap[ActionLsn] = Action;
+
+ m_PendingLock.WithExclusiveLock([&] {
+ m_RunningLock.WithExclusiveLock([&] {
+ if (auto FindIt = m_RunningMap.find(ActionLsn); FindIt == m_RunningMap.end())
+ {
+ m_PendingActions.erase(ActionLsn);
+ }
+ else
+ {
+ m_RunningMap.erase(FindIt);
+ }
+ });
+ });
+
+ m_ActionHistoryLock.WithExclusiveLock([&] {
+ ActionHistoryEntry Entry{.Lsn = ActionLsn,
+ .ActionId = Action->ActionId,
+ .WorkerId = Action->Worker.WorkerId,
+ .ActionDescriptor = Action->ActionObj,
+ .Succeeded = Action->ActionState() == RunnerAction::State::Completed};
+
+ std::copy(std::begin(Action->Timestamps), std::end(Action->Timestamps), std::begin(Entry.Timestamps));
+
+ m_ActionHistory.push_back(std::move(Entry));
+
+ if (m_ActionHistory.size() > m_HistoryLimit)
+ {
+ m_ActionHistory.pop_front();
+ }
+ });
+ });
+ m_RetiredCount.fetch_add(1);
+ m_ResultRate.Mark(1);
+ ZEN_DEBUG("action {} ({}) RUNNING -> COMPLETED with {}",
+ Action->ActionId,
+ ActionLsn,
+ Action->ActionState() == RunnerAction::State::Completed ? "SUCCESS" : "FAILURE");
+ break;
+ }
+ }
+ }
+}
+
+size_t
+FunctionServiceSession::Impl::QueryCapacity()
+{
+ return m_LocalRunnerGroup.QueryCapacity() + m_RemoteRunnerGroup.QueryCapacity();
+}
+
+std::vector<SubmitResult>
+FunctionServiceSession::Impl::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
+{
+ std::vector<SubmitResult> Results;
+
+ for (const Ref<RunnerAction>& Action : Actions)
+ {
+ Results.push_back(SubmitAction(Action));
+ }
+
+ return Results;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+FunctionServiceSession::FunctionServiceSession(ChunkResolver& InChunkResolver)
+{
+ m_Impl = std::make_unique<Impl>(this, InChunkResolver);
+}
+
+FunctionServiceSession::~FunctionServiceSession()
+{
+ Shutdown();
+}
+
+bool
+FunctionServiceSession::IsHealthy()
+{
+ return m_Impl->IsHealthy();
+}
+
+void
+FunctionServiceSession::Shutdown()
+{
+ m_Impl->Shutdown();
+}
+
+void
+FunctionServiceSession::StartRecording(ChunkResolver& InResolver, const std::filesystem::path& RecordingPath)
+{
+ m_Impl->StartRecording(InResolver, RecordingPath);
+}
+
+void
+FunctionServiceSession::StopRecording()
+{
+ m_Impl->StopRecording();
+}
+
+void
+FunctionServiceSession::EmitStats(CbObjectWriter& Cbo)
+{
+ m_Impl->EmitStats(Cbo);
+}
+
+std::vector<IoHash>
+FunctionServiceSession::GetKnownWorkerIds()
+{
+ return m_Impl->GetKnownWorkerIds();
+}
+
+WorkerDesc
+FunctionServiceSession::GetWorkerDescriptor(const IoHash& WorkerId)
+{
+ return m_Impl->GetWorkerDescriptor(WorkerId);
+}
+
+void
+FunctionServiceSession::AddLocalRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath)
+{
+ m_Impl->m_LocalRunnerGroup.AddRunner(new LocalProcessRunner(InChunkResolver, BasePath));
+}
+
+void
+FunctionServiceSession::AddRemoteRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath, std::string_view HostName)
+{
+ m_Impl->m_RemoteRunnerGroup.AddRunner(new RemoteHttpRunner(InChunkResolver, BasePath, HostName));
+}
+
+FunctionServiceSession::EnqueueResult
+FunctionServiceSession::EnqueueAction(CbObject ActionObject, int Priority)
+{
+ return m_Impl->EnqueueAction(ActionObject, Priority);
+}
+
+FunctionServiceSession::EnqueueResult
+FunctionServiceSession::EnqueueResolvedAction(WorkerDesc Worker, CbObject ActionObj, int RequestPriority)
+{
+ return m_Impl->EnqueueResolvedAction(Worker, ActionObj, RequestPriority);
+}
+
+void
+FunctionServiceSession::RegisterWorker(CbPackage Worker)
+{
+ m_Impl->RegisterWorker(Worker);
+}
+
+HttpResponseCode
+FunctionServiceSession::GetActionResult(int ActionLsn, CbPackage& OutResultPackage)
+{
+ return m_Impl->GetActionResult(ActionLsn, OutResultPackage);
+}
+
+HttpResponseCode
+FunctionServiceSession::FindActionResult(const IoHash& ActionId, CbPackage& OutResultPackage)
+{
+ return m_Impl->FindActionResult(ActionId, OutResultPackage);
+}
+
+std::vector<FunctionServiceSession::ActionHistoryEntry>
+FunctionServiceSession::GetActionHistory(int Limit)
+{
+ return m_Impl->GetActionHistory(Limit);
+}
+
+void
+FunctionServiceSession::GetCompleted(CbWriter& Cbo)
+{
+ m_Impl->GetCompleted(Cbo);
+}
+
+void
+FunctionServiceSession::PostUpdate(RunnerAction* Action)
+{
+ m_Impl->PostUpdate(Action);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+void
+function_forcelink()
+{
+}
+
+} // namespace zen::compute
+
+#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/src/zencompute/httpfunctionservice.cpp b/src/zencompute/httpfunctionservice.cpp
new file mode 100644
index 000000000..09a9684a7
--- /dev/null
+++ b/src/zencompute/httpfunctionservice.cpp
@@ -0,0 +1,709 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "zencompute/httpfunctionservice.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include "functionrunner.h"
+
+# include <zencore/compactbinary.h>
+# include <zencore/compactbinarybuilder.h>
+# include <zencore/compactbinarypackage.h>
+# include <zencore/compress.h>
+# include <zencore/except.h>
+# include <zencore/filesystem.h>
+# include <zencore/fmtutils.h>
+# include <zencore/iobuffer.h>
+# include <zencore/iohash.h>
+# include <zencore/system.h>
+# include <zenstore/cidstore.h>
+
+# include <span>
+
+using namespace std::literals;
+
+namespace zen::compute {
+
+constinit AsciiSet g_DecimalSet("0123456789");
+auto DecimalMatcher = [](std::string_view Str) { return AsciiSet::HasOnly(Str, g_DecimalSet); };
+
+constinit AsciiSet g_HexSet("0123456789abcdefABCDEF");
+auto IoHashMatcher = [](std::string_view Str) { return Str.size() == 40 && AsciiSet::HasOnly(Str, g_HexSet); };
+
+HttpFunctionService::HttpFunctionService(CidStore& InCidStore,
+ IHttpStatsService& StatsService,
+ [[maybe_unused]] const std::filesystem::path& BaseDir)
+: m_CidStore(InCidStore)
+, m_StatsService(StatsService)
+, m_Log(logging::Get("apply"))
+, m_BaseDir(BaseDir)
+, m_FunctionService(InCidStore)
+{
+ m_FunctionService.AddLocalRunner(InCidStore, m_BaseDir / "local");
+
+ m_StatsService.RegisterHandler("apply", *this);
+
+ m_Router.AddMatcher("lsn", DecimalMatcher);
+ m_Router.AddMatcher("worker", IoHashMatcher);
+ m_Router.AddMatcher("action", IoHashMatcher);
+
+ m_Router.RegisterRoute(
+ "ready",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ if (m_FunctionService.IsHealthy())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, "ok");
+ }
+
+ return HttpReq.WriteResponse(HttpResponseCode::ServiceUnavailable);
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "workers",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("workers"sv);
+ for (const IoHash& WorkerId : m_FunctionService.GetKnownWorkerIds())
+ {
+ Cbo << WorkerId;
+ }
+ Cbo.EndArray();
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "workers/{worker}",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1));
+
+ switch (HttpReq.RequestVerb())
+ {
+ case HttpVerb::kGet:
+ if (WorkerDesc Desc = m_FunctionService.GetWorkerDescriptor(WorkerId))
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Desc.Descriptor.GetObject());
+ }
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+
+ case HttpVerb::kPost:
+ {
+ switch (HttpReq.RequestContentType())
+ {
+ case HttpContentType::kCbObject:
+ {
+ CbObject WorkerSpec = HttpReq.ReadPayloadObject();
+
+ // Determine which pieces are missing and need to be transmitted
+
+ HashKeySet ChunkSet;
+
+ WorkerSpec.IterateAttachments([&](CbFieldView Field) {
+ const IoHash Hash = Field.AsHash();
+ ChunkSet.AddHashToSet(Hash);
+ });
+
+ CbPackage WorkerPackage;
+ WorkerPackage.SetObject(WorkerSpec);
+
+ m_CidStore.FilterChunks(ChunkSet);
+
+ if (ChunkSet.IsEmpty())
+ {
+ ZEN_DEBUG("worker {}: all attachments already available", WorkerId);
+ m_FunctionService.RegisterWorker(WorkerPackage);
+ return HttpReq.WriteResponse(HttpResponseCode::NoContent);
+ }
+
+ CbObjectWriter ResponseWriter;
+ ResponseWriter.BeginArray("need");
+
+ ChunkSet.IterateHashes([&](const IoHash& Hash) {
+ ZEN_DEBUG("worker {}: need chunk {}", WorkerId, Hash);
+ ResponseWriter.AddHash(Hash);
+ });
+
+ ResponseWriter.EndArray();
+
+ ZEN_DEBUG("worker {}: need {} attachments", WorkerId, ChunkSet.GetSize());
+
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound, ResponseWriter.Save());
+ }
+ break;
+
+ case HttpContentType::kCbPackage:
+ {
+ CbPackage WorkerSpecPackage = HttpReq.ReadPayloadPackage();
+ CbObject WorkerSpec = WorkerSpecPackage.GetObject();
+
+ std::span<const CbAttachment> Attachments = WorkerSpecPackage.GetAttachments();
+
+ int AttachmentCount = 0;
+ int NewAttachmentCount = 0;
+ uint64_t TotalAttachmentBytes = 0;
+ uint64_t TotalNewBytes = 0;
+
+ for (const CbAttachment& Attachment : Attachments)
+ {
+ ZEN_ASSERT(Attachment.IsCompressedBinary());
+
+ const IoHash DataHash = Attachment.GetHash();
+ CompressedBuffer Buffer = Attachment.AsCompressedBinary();
+
+ ZEN_UNUSED(DataHash);
+ TotalAttachmentBytes += Buffer.GetCompressedSize();
+ ++AttachmentCount;
+
+ const CidStore::InsertResult InsertResult =
+ m_CidStore.AddChunk(Buffer.GetCompressed().Flatten().AsIoBuffer(), DataHash);
+
+ if (InsertResult.New)
+ {
+ TotalNewBytes += Buffer.GetCompressedSize();
+ ++NewAttachmentCount;
+ }
+ }
+
+ ZEN_DEBUG("worker {}: {} in {} attachments, {} in {} new attachments",
+ WorkerId,
+ zen::NiceBytes(TotalAttachmentBytes),
+ AttachmentCount,
+ zen::NiceBytes(TotalNewBytes),
+ NewAttachmentCount);
+
+ m_FunctionService.RegisterWorker(WorkerSpecPackage);
+
+ return HttpReq.WriteResponse(HttpResponseCode::NoContent);
+ }
+ break;
+
+ default:
+ break;
+ }
+ }
+ break;
+
+ default:
+ break;
+ }
+ },
+ HttpVerb::kGet | HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "jobs/completed",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ CbObjectWriter Cbo;
+ m_FunctionService.GetCompleted(Cbo);
+
+ SystemMetrics Sm = GetSystemMetricsForReporting();
+ Cbo.BeginObject("metrics");
+ Describe(Sm, Cbo);
+ Cbo.EndObject();
+
+ HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "jobs/history",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const auto QueryParams = HttpReq.GetQueryParams();
+
+ int QueryLimit = 50;
+
+ if (auto LimitParam = QueryParams.GetValue("limit"); LimitParam.empty() == false)
+ {
+ QueryLimit = ParseInt<int>(LimitParam).value_or(50);
+ }
+
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("history");
+ for (const auto& Entry : m_FunctionService.GetActionHistory(QueryLimit))
+ {
+ Cbo.BeginObject();
+ Cbo << "lsn"sv << Entry.Lsn;
+ Cbo << "actionId"sv << Entry.ActionId;
+ Cbo << "workerId"sv << Entry.WorkerId;
+ Cbo << "succeeded"sv << Entry.Succeeded;
+ Cbo << "actionDescriptor"sv << Entry.ActionDescriptor;
+
+ for (const auto& Timestamp : Entry.Timestamps)
+ {
+ Cbo.AddInteger(
+ fmt::format("time_{}"sv, RunnerAction::ToString(static_cast<RunnerAction::State>(&Timestamp - Entry.Timestamps))),
+ Timestamp);
+ }
+ Cbo.EndObject();
+ }
+ Cbo.EndArray();
+
+ HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "jobs/{lsn}",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const int ActionLsn = std::stoi(std::string{Req.GetCapture(1)});
+
+ switch (HttpReq.RequestVerb())
+ {
+ case HttpVerb::kGet:
+ {
+ CbPackage Output;
+ HttpResponseCode ResponseCode = m_FunctionService.GetActionResult(ActionLsn, Output);
+
+ if (ResponseCode == HttpResponseCode::OK)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
+ }
+
+ return HttpReq.WriteResponse(ResponseCode);
+ }
+ break;
+
+ case HttpVerb::kPost:
+ {
+ // Add support for cancellation, priority changes
+ }
+ break;
+
+ default:
+ break;
+ }
+ },
+ HttpVerb::kGet | HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "jobs/{worker}/{action}", // This route is inefficient, and is only here for backwards compatibility. The preferred path is the
+ // one which uses the scheduled action lsn for lookups
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const IoHash ActionId = IoHash::FromHexString(Req.GetCapture(2));
+
+ CbPackage Output;
+ if (HttpResponseCode ResponseCode = m_FunctionService.FindActionResult(ActionId, /* out */ Output);
+ ResponseCode != HttpResponseCode::OK)
+ {
+ ZEN_TRACE("jobs/{}/{}: {}", Req.GetCapture(1), Req.GetCapture(2), ToString(ResponseCode))
+
+ if (ResponseCode == HttpResponseCode::NotFound)
+ {
+ return HttpReq.WriteResponse(ResponseCode);
+ }
+
+ return HttpReq.WriteResponse(ResponseCode);
+ }
+
+ ZEN_DEBUG("jobs/{}/{}: OK", Req.GetCapture(1), Req.GetCapture(2))
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "jobs/{worker}",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1));
+
+ WorkerDesc Worker = m_FunctionService.GetWorkerDescriptor(WorkerId);
+
+ if (!Worker)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ const auto QueryParams = Req.ServerRequest().GetQueryParams();
+
+ int RequestPriority = -1;
+
+ if (auto PriorityParam = QueryParams.GetValue("priority"); PriorityParam.empty() == false)
+ {
+ RequestPriority = ParseInt<int>(PriorityParam).value_or(-1);
+ }
+
+ switch (HttpReq.RequestVerb())
+ {
+ case HttpVerb::kGet:
+ // TODO: return status of all pending or executing jobs
+ break;
+
+ case HttpVerb::kPost:
+ switch (HttpReq.RequestContentType())
+ {
+ case HttpContentType::kCbObject:
+ {
+ // This operation takes the proposed job spec and identifies which
+ // chunks are not present on this server. This list is then returned in
+ // the "need" list in the response
+
+ IoBuffer Payload = HttpReq.ReadPayload();
+ CbObject ActionObj = LoadCompactBinaryObject(Payload);
+
+ std::vector<IoHash> NeedList;
+
+ ActionObj.IterateAttachments([&](CbFieldView Field) {
+ const IoHash FileHash = Field.AsHash();
+
+ if (!m_CidStore.ContainsChunk(FileHash))
+ {
+ NeedList.push_back(FileHash);
+ }
+ });
+
+ if (NeedList.empty())
+ {
+ // We already have everything, enqueue the action for execution
+
+ if (FunctionServiceSession::EnqueueResult Result =
+ m_FunctionService.EnqueueResolvedAction(Worker, ActionObj, RequestPriority))
+ {
+ ZEN_DEBUG("action {} accepted (lsn {})", ActionObj.GetHash(), Result.Lsn);
+
+ HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage);
+ }
+
+ return;
+ }
+
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("need");
+
+ for (const IoHash& Hash : NeedList)
+ {
+ Cbo << Hash;
+ }
+
+ Cbo.EndArray();
+ CbObject Response = Cbo.Save();
+
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound, Response);
+ }
+ break;
+
+ case HttpContentType::kCbPackage:
+ {
+ CbPackage Action = HttpReq.ReadPayloadPackage();
+ CbObject ActionObj = Action.GetObject();
+
+ std::span<const CbAttachment> Attachments = Action.GetAttachments();
+
+ int AttachmentCount = 0;
+ int NewAttachmentCount = 0;
+ uint64_t TotalAttachmentBytes = 0;
+ uint64_t TotalNewBytes = 0;
+
+ for (const CbAttachment& Attachment : Attachments)
+ {
+ ZEN_ASSERT(Attachment.IsCompressedBinary());
+
+ const IoHash DataHash = Attachment.GetHash();
+ CompressedBuffer DataView = Attachment.AsCompressedBinary();
+
+ ZEN_UNUSED(DataHash);
+
+ const uint64_t CompressedSize = DataView.GetCompressedSize();
+
+ TotalAttachmentBytes += CompressedSize;
+ ++AttachmentCount;
+
+ const CidStore::InsertResult InsertResult =
+ m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash);
+
+ if (InsertResult.New)
+ {
+ TotalNewBytes += CompressedSize;
+ ++NewAttachmentCount;
+ }
+ }
+
+ if (FunctionServiceSession::EnqueueResult Result =
+ m_FunctionService.EnqueueResolvedAction(Worker, ActionObj, RequestPriority))
+ {
+ ZEN_DEBUG("accepted action {} (lsn {}): {} in {} attachments. {} new ({} attachments)",
+ ActionObj.GetHash(),
+ Result.Lsn,
+ zen::NiceBytes(TotalAttachmentBytes),
+ AttachmentCount,
+ zen::NiceBytes(TotalNewBytes),
+ NewAttachmentCount);
+
+ HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage);
+ }
+
+ return;
+ }
+ break;
+
+ default:
+ break;
+ }
+ break;
+
+ default:
+ break;
+ }
+ },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "jobs",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ const auto QueryParams = HttpReq.GetQueryParams();
+
+ int RequestPriority = -1;
+
+ if (auto PriorityParam = QueryParams.GetValue("priority"); PriorityParam.empty() == false)
+ {
+ RequestPriority = ParseInt<int>(PriorityParam).value_or(-1);
+ }
+
+ // Resolve worker
+
+ //
+
+ switch (HttpReq.RequestContentType())
+ {
+ case HttpContentType::kCbObject:
+ {
+ // This operation takes the proposed job spec and identifies which
+ // chunks are not present on this server. This list is then returned in
+ // the "need" list in the response
+
+ IoBuffer Payload = HttpReq.ReadPayload();
+ CbObject ActionObj = LoadCompactBinaryObject(Payload);
+
+ std::vector<IoHash> NeedList;
+
+ ActionObj.IterateAttachments([&](CbFieldView Field) {
+ const IoHash FileHash = Field.AsHash();
+
+ if (!m_CidStore.ContainsChunk(FileHash))
+ {
+ NeedList.push_back(FileHash);
+ }
+ });
+
+ if (NeedList.empty())
+ {
+ // We already have everything, enqueue the action for execution
+
+ if (FunctionServiceSession::EnqueueResult Result = m_FunctionService.EnqueueAction(ActionObj, RequestPriority))
+ {
+ ZEN_DEBUG("action accepted (lsn {})", Result.Lsn);
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage);
+ }
+ else
+ {
+ // Could not resolve?
+ return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage);
+ }
+ }
+
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("need");
+
+ for (const IoHash& Hash : NeedList)
+ {
+ Cbo << Hash;
+ }
+
+ Cbo.EndArray();
+ CbObject Response = Cbo.Save();
+
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound, Response);
+ }
+
+ case HttpContentType::kCbPackage:
+ {
+ CbPackage Action = HttpReq.ReadPayloadPackage();
+ CbObject ActionObj = Action.GetObject();
+
+ std::span<const CbAttachment> Attachments = Action.GetAttachments();
+
+ int AttachmentCount = 0;
+ int NewAttachmentCount = 0;
+ uint64_t TotalAttachmentBytes = 0;
+ uint64_t TotalNewBytes = 0;
+
+ for (const CbAttachment& Attachment : Attachments)
+ {
+ ZEN_ASSERT(Attachment.IsCompressedBinary());
+
+ const IoHash DataHash = Attachment.GetHash();
+ CompressedBuffer DataView = Attachment.AsCompressedBinary();
+
+ ZEN_UNUSED(DataHash);
+
+ const uint64_t CompressedSize = DataView.GetCompressedSize();
+
+ TotalAttachmentBytes += CompressedSize;
+ ++AttachmentCount;
+
+ const CidStore::InsertResult InsertResult =
+ m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash);
+
+ if (InsertResult.New)
+ {
+ TotalNewBytes += CompressedSize;
+ ++NewAttachmentCount;
+ }
+ }
+
+ if (FunctionServiceSession::EnqueueResult Result = m_FunctionService.EnqueueAction(ActionObj, RequestPriority))
+ {
+ ZEN_DEBUG("accepted action (lsn {}): {} in {} attachments. {} new ({} attachments)",
+ Result.Lsn,
+ zen::NiceBytes(TotalAttachmentBytes),
+ AttachmentCount,
+ zen::NiceBytes(TotalNewBytes),
+ NewAttachmentCount);
+
+ HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage);
+ }
+ else
+ {
+ // Could not resolve?
+ return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage);
+ }
+ }
+ return;
+ }
+ },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "workers/all",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ std::vector<IoHash> WorkerIds = m_FunctionService.GetKnownWorkerIds();
+
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("workers");
+
+ for (const IoHash& WorkerId : WorkerIds)
+ {
+ Cbo.BeginObject();
+
+ Cbo << "id" << WorkerId;
+
+ const auto& Descriptor = m_FunctionService.GetWorkerDescriptor(WorkerId);
+
+ Cbo << "descriptor" << Descriptor.Descriptor.GetObject();
+
+ Cbo.EndObject();
+ }
+
+ Cbo.EndArray();
+
+ HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "sysinfo",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ SystemMetrics Sm = GetSystemMetricsForReporting();
+
+ CbObjectWriter Cbo;
+ Describe(Sm, Cbo);
+
+ Cbo << "cpu_usage" << Sm.CpuUsagePercent;
+ Cbo << "memory_total" << Sm.SystemMemoryMiB * 1024 * 1024;
+ Cbo << "memory_used" << (Sm.SystemMemoryMiB - Sm.AvailSystemMemoryMiB) * 1024 * 1024;
+ Cbo << "disk_used" << 100 * 1024;
+ Cbo << "disk_total" << 100 * 1024 * 1024;
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "record/start",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ m_FunctionService.StartRecording(m_CidStore, m_BaseDir / "recording");
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK);
+ },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "record/stop",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ m_FunctionService.StopRecording();
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK);
+ },
+ HttpVerb::kPost);
+}
+
+HttpFunctionService::~HttpFunctionService()
+{
+ m_StatsService.UnregisterHandler("apply", *this);
+}
+
+void
+HttpFunctionService::Shutdown()
+{
+ m_FunctionService.Shutdown();
+}
+
+const char*
+HttpFunctionService::BaseUri() const
+{
+ return "/apply/";
+}
+
+void
+HttpFunctionService::HandleRequest(HttpServerRequest& Request)
+{
+ metrics::OperationTiming::Scope $(m_HttpRequests);
+
+ if (m_Router.HandleRequest(Request) == false)
+ {
+ ZEN_WARN("No route found for {0}", Request.RelativeUri());
+ }
+}
+
+void
+HttpFunctionService::HandleStatsRequest(HttpServerRequest& Request)
+{
+ CbObjectWriter Cbo;
+ m_FunctionService.EmitStats(Cbo);
+
+ Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+void
+httpfunction_forcelink()
+{
+}
+
+} // namespace zen::compute
+
+#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/src/zencompute/httporchestrator.cpp b/src/zencompute/httporchestrator.cpp
new file mode 100644
index 000000000..39e7e60d7
--- /dev/null
+++ b/src/zencompute/httporchestrator.cpp
@@ -0,0 +1,81 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "zencompute/httporchestrator.h"
+
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/logging.h>
+
+namespace zen::compute {
+
+HttpOrchestratorService::HttpOrchestratorService() : m_Log(logging::Get("orch"))
+{
+ m_Router.RegisterRoute(
+ "provision",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("workers");
+
+ m_KnownWorkersLock.WithSharedLock([&] {
+ for (const auto& [WorkerId, Worker] : m_KnownWorkers)
+ {
+ Cbo.BeginObject();
+ Cbo << "uri" << Worker.BaseUri;
+ Cbo << "dt" << Worker.LastSeen.GetElapsedTimeMs();
+ Cbo.EndObject();
+ }
+ });
+
+ Cbo.EndArray();
+
+ HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "announce",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ CbObject Data = HttpReq.ReadPayloadObject();
+
+ std::string_view WorkerId = Data["id"].AsString("");
+ std::string_view WorkerUri = Data["uri"].AsString("");
+
+ if (WorkerId.empty() || WorkerUri.empty())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest);
+ }
+
+ m_KnownWorkersLock.WithExclusiveLock([&] {
+ auto& Worker = m_KnownWorkers[std::string(WorkerId)];
+ Worker.BaseUri = WorkerUri;
+ Worker.LastSeen.Reset();
+ });
+
+ HttpReq.WriteResponse(HttpResponseCode::OK);
+ },
+ HttpVerb::kPost);
+}
+
+HttpOrchestratorService::~HttpOrchestratorService()
+{
+}
+
+const char*
+HttpOrchestratorService::BaseUri() const
+{
+ return "/orch/";
+}
+
+void
+HttpOrchestratorService::HandleRequest(HttpServerRequest& Request)
+{
+ if (m_Router.HandleRequest(Request) == false)
+ {
+ ZEN_WARN("No route found for {0}", Request.RelativeUri());
+ }
+}
+
+} // namespace zen::compute
diff --git a/src/zencompute/include/zencompute/functionservice.h b/src/zencompute/include/zencompute/functionservice.h
new file mode 100644
index 000000000..1deb99fd5
--- /dev/null
+++ b/src/zencompute/include/zencompute/functionservice.h
@@ -0,0 +1,132 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/zencore.h>
+
+#if !defined(ZEN_WITH_COMPUTE_SERVICES)
+# define ZEN_WITH_COMPUTE_SERVICES 1
+#endif
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include <zencore/compactbinary.h>
+# include <zencore/compactbinarypackage.h>
+# include <zencore/iohash.h>
+# include <zenstore/zenstore.h>
+# include <zenhttp/httpcommon.h>
+
+# include <filesystem>
+
+namespace zen {
+class ChunkResolver;
+class CbObjectWriter;
+} // namespace zen
+
+namespace zen::compute {
+
+class ActionRecorder;
+class FunctionServiceSession;
+class IActionResultHandler;
+class LocalProcessRunner;
+class RemoteHttpRunner;
+struct RunnerAction;
+struct SubmitResult;
+
+struct WorkerDesc
+{
+ CbPackage Descriptor;
+ IoHash WorkerId{IoHash::Zero};
+
+ inline operator bool() const { return WorkerId != IoHash::Zero; }
+};
+
+/**
+ * Lambda style compute function service
+ *
+ * The responsibility of this class is to accept function execution requests, and
+ * schedule them using one or more FunctionRunner instances. It will basically always
+ * accept requests, queueing them if necessary, and then hand them off to runners
+ * as they become available.
+ *
+ * This is typically fronted by an API service that handles communication with clients.
+ */
+class FunctionServiceSession final
+{
+public:
+ FunctionServiceSession(ChunkResolver& InChunkResolver);
+ ~FunctionServiceSession();
+
+ void Shutdown();
+ bool IsHealthy();
+
+ // Worker registration and discovery
+
+ void RegisterWorker(CbPackage Worker);
+ [[nodiscard]] WorkerDesc GetWorkerDescriptor(const IoHash& WorkerId);
+ [[nodiscard]] std::vector<IoHash> GetKnownWorkerIds();
+
+ // Action runners
+
+ void AddLocalRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath);
+ void AddRemoteRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath, std::string_view HostName);
+
+ // Action submission
+
+ struct EnqueueResult
+ {
+ int Lsn;
+ CbObject ResponseMessage;
+
+ inline operator bool() const { return Lsn != 0; }
+ };
+
+ [[nodiscard]] EnqueueResult EnqueueResolvedAction(WorkerDesc Worker, CbObject ActionObj, int Priority);
+ [[nodiscard]] EnqueueResult EnqueueAction(CbObject ActionObject, int Priority);
+
+ // Completed action tracking
+
+ [[nodiscard]] HttpResponseCode GetActionResult(int ActionLsn, CbPackage& OutResultPackage);
+ [[nodiscard]] HttpResponseCode FindActionResult(const IoHash& ActionId, CbPackage& ResultPackage);
+
+ void GetCompleted(CbWriter&);
+
+ // Action history tracking (note that this is separate from completed action tracking, and
+ // will include actions which have been retired and no longer have their results available)
+
+ struct ActionHistoryEntry
+ {
+ int Lsn;
+ IoHash ActionId;
+ IoHash WorkerId;
+ CbObject ActionDescriptor;
+ bool Succeeded;
+ uint64_t Timestamps[5] = {};
+ };
+
+ [[nodiscard]] std::vector<ActionHistoryEntry> GetActionHistory(int Limit = 100);
+
+ // Stats reporting
+
+ void EmitStats(CbObjectWriter& Cbo);
+
+ // Recording
+
+ void StartRecording(ChunkResolver& InResolver, const std::filesystem::path& RecordingPath);
+ void StopRecording();
+
+private:
+ void PostUpdate(RunnerAction* Action);
+
+ friend class FunctionRunner;
+ friend struct RunnerAction;
+
+ struct Impl;
+ std::unique_ptr<Impl> m_Impl;
+};
+
+void function_forcelink();
+
+} // namespace zen::compute
+
+#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/src/zencompute/include/zencompute/httpfunctionservice.h b/src/zencompute/include/zencompute/httpfunctionservice.h
new file mode 100644
index 000000000..6e2344ae6
--- /dev/null
+++ b/src/zencompute/include/zencompute/httpfunctionservice.h
@@ -0,0 +1,73 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/zencore.h>
+
+#if !defined(ZEN_WITH_COMPUTE_SERVICES)
+# define ZEN_WITH_COMPUTE_SERVICES 1
+#endif
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include "zencompute/functionservice.h"
+
+# include <zencore/compactbinary.h>
+# include <zencore/compactbinarypackage.h>
+# include <zencore/iohash.h>
+# include <zencore/logging.h>
+# include <zentelemetry/stats.h>
+# include <zenhttp/httpserver.h>
+
+# include <deque>
+# include <filesystem>
+# include <unordered_map>
+
+namespace zen {
+class CidStore;
+}
+
+namespace zen::compute {
+
+class HttpFunctionService;
+class FunctionService;
+
+/**
+ * HTTP interface for compute function service
+ */
+class HttpFunctionService : public HttpService, public IHttpStatsProvider
+{
+public:
+ HttpFunctionService(CidStore& InCidStore, IHttpStatsService& StatsService, const std::filesystem::path& BaseDir);
+ ~HttpFunctionService();
+
+ void Shutdown();
+
+ virtual const char* BaseUri() const override;
+ virtual void HandleRequest(HttpServerRequest& Request) override;
+
+ // IHttpStatsProvider
+
+ virtual void HandleStatsRequest(HttpServerRequest& Request) override;
+
+protected:
+ CidStore& m_CidStore;
+ IHttpStatsService& m_StatsService;
+ LoggerRef Log() { return m_Log; }
+
+private:
+ LoggerRef m_Log;
+ std::filesystem ::path m_BaseDir;
+ HttpRequestRouter m_Router;
+ FunctionServiceSession m_FunctionService;
+
+ // Metrics
+
+ metrics::OperationTiming m_HttpRequests;
+};
+
+void httpfunction_forcelink();
+
+} // namespace zen::compute
+
+#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/src/zencompute/include/zencompute/httporchestrator.h b/src/zencompute/include/zencompute/httporchestrator.h
new file mode 100644
index 000000000..168c6d7fe
--- /dev/null
+++ b/src/zencompute/include/zencompute/httporchestrator.h
@@ -0,0 +1,44 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/logging.h>
+#include <zencore/thread.h>
+#include <zencore/timer.h>
+#include <zenhttp/httpserver.h>
+
+#include <unordered_map>
+
+namespace zen::compute {
+
+/**
+ * Mock orchestrator service, for testing dynamic provisioning
+ */
+
+class HttpOrchestratorService : public HttpService
+{
+public:
+ HttpOrchestratorService();
+ ~HttpOrchestratorService();
+
+ HttpOrchestratorService(const HttpOrchestratorService&) = delete;
+ HttpOrchestratorService& operator=(const HttpOrchestratorService&) = delete;
+
+ virtual const char* BaseUri() const override;
+ virtual void HandleRequest(HttpServerRequest& Request) override;
+
+private:
+ HttpRequestRouter m_Router;
+ LoggerRef m_Log;
+
+ struct KnownWorker
+ {
+ std::string_view BaseUri;
+ Stopwatch LastSeen;
+ };
+
+ RwLock m_KnownWorkersLock;
+ std::unordered_map<std::string, KnownWorker> m_KnownWorkers;
+};
+
+} // namespace zen::compute
diff --git a/src/zencompute/include/zencompute/recordingreader.h b/src/zencompute/include/zencompute/recordingreader.h
new file mode 100644
index 000000000..bf1aff125
--- /dev/null
+++ b/src/zencompute/include/zencompute/recordingreader.h
@@ -0,0 +1,127 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencompute/functionservice.h>
+#include <zencompute/zencompute.h>
+#include <zencore/basicfile.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zenstore/cidstore.h>
+#include <zenstore/gc.h>
+#include <zenstore/zenstore.h>
+
+#include <filesystem>
+#include <functional>
+#include <unordered_map>
+
+namespace zen {
+class CbObject;
+class CbPackage;
+struct IoHash;
+} // namespace zen
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+namespace zen::compute {
+
+//////////////////////////////////////////////////////////////////////////
+
+class RecordingReaderBase
+{
+ RecordingReaderBase(const RecordingReaderBase&) = delete;
+ RecordingReaderBase& operator=(const RecordingReaderBase&) = delete;
+
+public:
+ RecordingReaderBase() = default;
+ virtual ~RecordingReaderBase() = 0;
+ virtual std::unordered_map<IoHash, CbPackage> ReadWorkers() = 0;
+ virtual void IterateActions(std::function<void(CbObject ActionObject, const IoHash& ActionId)>&& Callback, int TargetParallelism) = 0;
+ virtual size_t GetActionCount() const = 0;
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+/**
+ * Reader for recordings done via the zencompute recording system, which
+ * have a shared chunk store and a log of actions with pointers into the
+ * chunk store for their data.
+ */
+class RecordingReader : public RecordingReaderBase, public ChunkResolver
+{
+public:
+ explicit RecordingReader(const std::filesystem::path& RecordingPath);
+ ~RecordingReader();
+
+ virtual std::unordered_map<zen::IoHash, zen::CbPackage> ReadWorkers() override;
+
+ virtual void IterateActions(std::function<void(CbObject ActionObject, const IoHash& ActionId)>&& Callback,
+ int TargetParallelism) override;
+ virtual size_t GetActionCount() const override;
+
+private:
+ std::filesystem::path m_RecordingLogDir;
+ BasicFile m_WorkerDataFile;
+ BasicFile m_ActionDataFile;
+ GcManager m_Gc;
+ CidStore m_CidStore{m_Gc};
+
+ // ChunkResolver interface
+ virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) override;
+
+ struct ActionEntry
+ {
+ IoHash ActionId;
+ uint64_t Offset;
+ uint64_t Size;
+ };
+
+ std::vector<ActionEntry> m_Actions;
+
+ void ScanActions();
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+struct LocalResolver : public ChunkResolver
+{
+ LocalResolver(const LocalResolver&) = delete;
+ LocalResolver& operator=(const LocalResolver&) = delete;
+
+ LocalResolver() = default;
+ ~LocalResolver() = default;
+
+ virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) override;
+ void Add(const IoHash& Cid, IoBuffer Data);
+
+private:
+ RwLock MapLock;
+ std::unordered_map<IoHash, IoBuffer> Attachments;
+};
+
+/**
+ * This is a reader for UE/DDB recordings, which have a different layout on
+ * disk (no shared chunk store)
+ */
+class UeRecordingReader : public RecordingReaderBase, public ChunkResolver
+{
+public:
+ explicit UeRecordingReader(const std::filesystem::path& RecordingPath);
+ ~UeRecordingReader();
+
+ virtual std::unordered_map<zen::IoHash, zen::CbPackage> ReadWorkers() override;
+ virtual void IterateActions(std::function<void(CbObject ActionObject, const IoHash& ActionId)>&& Callback,
+ int TargetParallelism) override;
+ virtual size_t GetActionCount() const override;
+ virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) override;
+
+private:
+ std::filesystem::path m_RecordingDir;
+ LocalResolver m_LocalResolver;
+ std::vector<std::filesystem::path> m_WorkDirs;
+
+ CbPackage ReadAction(std::filesystem::path WorkDir);
+};
+
+} // namespace zen::compute
+
+#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/src/zencompute/include/zencompute/zencompute.h b/src/zencompute/include/zencompute/zencompute.h
new file mode 100644
index 000000000..6dc32eeea
--- /dev/null
+++ b/src/zencompute/include/zencompute/zencompute.h
@@ -0,0 +1,11 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/zencore.h>
+
+namespace zen {
+
+void zencompute_forcelinktests();
+
+}
diff --git a/src/zencompute/localrunner.cpp b/src/zencompute/localrunner.cpp
new file mode 100644
index 000000000..9a27f3f3d
--- /dev/null
+++ b/src/zencompute/localrunner.cpp
@@ -0,0 +1,722 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "localrunner.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include <zencore/compactbinary.h>
+# include <zencore/compactbinarybuilder.h>
+# include <zencore/compactbinarypackage.h>
+# include <zencore/compress.h>
+# include <zencore/except.h>
+# include <zencore/filesystem.h>
+# include <zencore/fmtutils.h>
+# include <zencore/iobuffer.h>
+# include <zencore/iohash.h>
+# include <zencore/system.h>
+# include <zencore/scopeguard.h>
+# include <zencore/timer.h>
+# include <zenstore/cidstore.h>
+
+# include <span>
+
+namespace zen::compute {
+
+using namespace std::literals;
+
+LocalProcessRunner::LocalProcessRunner(ChunkResolver& Resolver, const std::filesystem::path& BaseDir)
+: FunctionRunner(BaseDir)
+, m_Log(logging::Get("local_exec"))
+, m_ChunkResolver(Resolver)
+, m_WorkerPath(std::filesystem::weakly_canonical(BaseDir / "workers"))
+, m_SandboxPath(std::filesystem::weakly_canonical(BaseDir / "scratch"))
+{
+ SystemMetrics Sm = GetSystemMetricsForReporting();
+
+ m_MaxRunningActions = Sm.LogicalProcessorCount * 2;
+
+ ZEN_INFO("Max concurrent action count: {}", m_MaxRunningActions);
+
+ bool DidCleanup = false;
+
+ if (std::filesystem::is_directory(m_ActionsPath))
+ {
+ ZEN_INFO("Cleaning '{}'", m_ActionsPath);
+
+ std::error_code Ec;
+ CleanDirectory(m_ActionsPath, /* ForceRemoveReadOnlyFiles */ true, Ec);
+
+ if (Ec)
+ {
+ ZEN_WARN("Unable to clean '{}': {}", m_ActionsPath, Ec.message());
+ }
+
+ DidCleanup = true;
+ }
+
+ if (std::filesystem::is_directory(m_SandboxPath))
+ {
+ ZEN_INFO("Cleaning '{}'", m_SandboxPath);
+ std::error_code Ec;
+ CleanDirectory(m_SandboxPath, /* ForceRemoveReadOnlyFiles */ true, Ec);
+
+ if (Ec)
+ {
+ ZEN_WARN("Unable to clean '{}': {}", m_SandboxPath, Ec.message());
+ }
+
+ DidCleanup = true;
+ }
+
+ // We clean out all workers on startup since we can't know they are good. They could be bad
+ // due to tampering, malware (which I also mean to include AV and antimalware software) or
+ // other processes we have no control over
+ if (std::filesystem::is_directory(m_WorkerPath))
+ {
+ ZEN_INFO("Cleaning '{}'", m_WorkerPath);
+ std::error_code Ec;
+ CleanDirectory(m_WorkerPath, /* ForceRemoveReadOnlyFiles */ true, Ec);
+
+ if (Ec)
+ {
+ ZEN_WARN("Unable to clean '{}': {}", m_WorkerPath, Ec.message());
+ }
+
+ DidCleanup = true;
+ }
+
+ if (DidCleanup)
+ {
+ ZEN_INFO("Cleanup complete");
+ }
+
+ m_MonitorThread = std::thread{&LocalProcessRunner::MonitorThreadFunction, this};
+
+# if ZEN_PLATFORM_WINDOWS
+ // Suppress any error dialogs caused by missing dependencies
+ UINT OldMode = ::SetErrorMode(0);
+ ::SetErrorMode(OldMode | SEM_FAILCRITICALERRORS);
+# endif
+
+ m_AcceptNewActions = true;
+}
+
+LocalProcessRunner::~LocalProcessRunner()
+{
+ try
+ {
+ Shutdown();
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_WARN("exception during local process runner shutdown: {}", Ex.what());
+ }
+}
+
+void
+LocalProcessRunner::Shutdown()
+{
+ m_AcceptNewActions = false;
+
+ m_MonitorThreadEnabled = false;
+ m_MonitorThreadEvent.Set();
+ if (m_MonitorThread.joinable())
+ {
+ m_MonitorThread.join();
+ }
+
+ CancelRunningActions();
+}
+
+std::filesystem::path
+LocalProcessRunner::CreateNewSandbox()
+{
+ std::string UniqueId = std::to_string(++m_SandboxCounter);
+ std::filesystem::path Path = m_SandboxPath / UniqueId;
+ zen::CreateDirectories(Path);
+
+ return Path;
+}
+
+void
+LocalProcessRunner::RegisterWorker(const CbPackage& WorkerPackage)
+{
+ if (m_DumpActions)
+ {
+ CbObject WorkerDescriptor = WorkerPackage.GetObject();
+ const IoHash& WorkerId = WorkerPackage.GetObjectHash();
+
+ std::string UniqueId = fmt::format("worker_{}"sv, WorkerId);
+ std::filesystem::path Path = m_ActionsPath / UniqueId;
+
+ zen::WriteFile(Path / "worker.ucb", WorkerDescriptor.GetBuffer().AsIoBuffer());
+
+ ManifestWorker(WorkerPackage, Path / "tree", [&](const IoHash& Cid, CompressedBuffer& ChunkBuffer) {
+ std::filesystem::path ChunkPath = Path / "chunks" / Cid.ToHexString();
+ zen::WriteFile(ChunkPath, ChunkBuffer.GetCompressed());
+ });
+
+ ZEN_INFO("dumped worker '{}' to 'file://{}'", WorkerId, Path);
+ }
+}
+
+size_t
+LocalProcessRunner::QueryCapacity()
+{
+ // Estimate how much more work we're ready to accept
+
+ RwLock::SharedLockScope _{m_RunningLock};
+
+ if (!m_AcceptNewActions)
+ {
+ return 0;
+ }
+
+ size_t RunningCount = m_RunningMap.size();
+
+ if (RunningCount >= size_t(m_MaxRunningActions))
+ {
+ return 0;
+ }
+
+ return m_MaxRunningActions - RunningCount;
+}
+
+std::vector<SubmitResult>
+LocalProcessRunner::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
+{
+ std::vector<SubmitResult> Results;
+
+ for (const Ref<RunnerAction>& Action : Actions)
+ {
+ Results.push_back(SubmitAction(Action));
+ }
+
+ return Results;
+}
+
+SubmitResult
+LocalProcessRunner::SubmitAction(Ref<RunnerAction> Action)
+{
+ // Verify whether we can accept more work
+
+ {
+ RwLock::SharedLockScope _{m_RunningLock};
+
+ if (!m_AcceptNewActions)
+ {
+ return SubmitResult{.IsAccepted = false};
+ }
+
+ if (m_RunningMap.size() >= size_t(m_MaxRunningActions))
+ {
+ return SubmitResult{.IsAccepted = false};
+ }
+ }
+
+ using namespace std::literals;
+
+ // Each enqueued action is assigned an integer index (logical sequence number),
+ // which we use as a key for tracking data structures and as an opaque id which
+ // may be used by clients to reference the scheduled action
+
+ const int32_t ActionLsn = Action->ActionLsn;
+ const CbObject& ActionObj = Action->ActionObj;
+ const IoHash ActionId = ActionObj.GetHash();
+
+ MaybeDumpAction(ActionLsn, ActionObj);
+
+ std::filesystem::path SandboxPath = CreateNewSandbox();
+
+ CbPackage WorkerPackage = Action->Worker.Descriptor;
+
+ std::filesystem::path WorkerPath = ManifestWorker(Action->Worker);
+
+ // Write out action
+
+ zen::WriteFile(SandboxPath / "build.action", ActionObj.GetBuffer().AsIoBuffer());
+
+ // Manifest inputs in sandbox
+
+ ActionObj.IterateAttachments([&](CbFieldView Field) {
+ const IoHash Cid = Field.AsHash();
+ std::filesystem::path FilePath{SandboxPath / "Inputs"sv / Cid.ToHexString()};
+ IoBuffer DataBuffer = m_ChunkResolver.FindChunkByCid(Cid);
+
+ if (!DataBuffer)
+ {
+ throw std::runtime_error(fmt::format("input CID chunk '{}' missing", Cid));
+ }
+
+ zen::WriteFile(FilePath, DataBuffer);
+ });
+
+# if ZEN_PLATFORM_WINDOWS
+ // Set up environment variables
+
+ StringBuilder<1024> EnvironmentBlock;
+
+ CbObject WorkerDescription = WorkerPackage.GetObject();
+
+ for (auto& It : WorkerDescription["environment"sv])
+ {
+ EnvironmentBlock.Append(It.AsString());
+ EnvironmentBlock.Append('\0');
+ }
+ EnvironmentBlock.Append('\0');
+ EnvironmentBlock.Append('\0');
+
+ // Execute process - this spawns the child process immediately without waiting
+ // for completion
+
+ std::string_view ExecPath = WorkerDescription["path"sv].AsString();
+ std::filesystem::path ExePath = WorkerPath / std::filesystem::path(ExecPath).make_preferred();
+
+ ExtendableWideStringBuilder<512> CommandLine;
+ CommandLine.Append(L'"');
+ CommandLine.Append(ExePath.c_str());
+ CommandLine.Append(L'"');
+ CommandLine.Append(L" -Build=build.action");
+
+ LPSECURITY_ATTRIBUTES lpProcessAttributes = nullptr;
+ LPSECURITY_ATTRIBUTES lpThreadAttributes = nullptr;
+ BOOL bInheritHandles = FALSE;
+ DWORD dwCreationFlags = 0;
+
+ STARTUPINFO StartupInfo{};
+ StartupInfo.cb = sizeof StartupInfo;
+
+ PROCESS_INFORMATION ProcessInformation{};
+
+ ZEN_DEBUG("Executing: {}", WideToUtf8(CommandLine.c_str()));
+
+ CommandLine.EnsureNulTerminated();
+
+ BOOL Success = CreateProcessW(nullptr,
+ CommandLine.Data(),
+ lpProcessAttributes,
+ lpThreadAttributes,
+ bInheritHandles,
+ dwCreationFlags,
+ (LPVOID)EnvironmentBlock.Data(), // Environment block
+ SandboxPath.c_str(), // Current directory
+ &StartupInfo,
+ /* out */ &ProcessInformation);
+
+ if (!Success)
+ {
+ // TODO: this is probably not the best way to report failure. The return
+ // object should include a failure state and context
+
+ zen::ThrowLastError("Unable to launch process" /* TODO: Add context */);
+ }
+
+ CloseHandle(ProcessInformation.hThread);
+
+ Ref<RunningAction> NewAction{new RunningAction()};
+ NewAction->Action = Action;
+ NewAction->ProcessHandle = ProcessInformation.hProcess;
+ NewAction->SandboxPath = std::move(SandboxPath);
+
+ {
+ RwLock::ExclusiveLockScope _(m_RunningLock);
+
+ m_RunningMap[ActionLsn] = std::move(NewAction);
+ }
+
+ Action->SetActionState(RunnerAction::State::Running);
+# else
+ ZEN_UNUSED(ActionId);
+
+ ZEN_NOT_IMPLEMENTED();
+
+ int ExitCode = 0;
+# endif
+
+ return SubmitResult{.IsAccepted = true};
+}
+
+size_t
+LocalProcessRunner::GetSubmittedActionCount()
+{
+ RwLock::SharedLockScope _(m_RunningLock);
+ return m_RunningMap.size();
+}
+
+std::filesystem::path
+LocalProcessRunner::ManifestWorker(const WorkerDesc& Worker)
+{
+ RwLock::SharedLockScope _(m_WorkerLock);
+
+ std::filesystem::path WorkerDir = m_WorkerPath / fmt::format("runner_{}", Worker.WorkerId);
+
+ if (!std::filesystem::exists(WorkerDir))
+ {
+ _.ReleaseNow();
+
+ RwLock::ExclusiveLockScope $(m_WorkerLock);
+
+ if (!std::filesystem::exists(WorkerDir))
+ {
+ ManifestWorker(Worker.Descriptor, WorkerDir, [](const IoHash&, CompressedBuffer&) {});
+ }
+ }
+
+ return WorkerDir;
+}
+
+void
+LocalProcessRunner::DecompressAttachmentToFile(const CbPackage& FromPackage,
+ CbObjectView FileEntry,
+ const std::filesystem::path& SandboxRootPath,
+ std::function<void(const IoHash&, CompressedBuffer&)>& ChunkReferenceCallback)
+{
+ std::string_view Name = FileEntry["name"sv].AsString();
+ const IoHash ChunkHash = FileEntry["hash"sv].AsHash();
+ const uint64_t Size = FileEntry["size"sv].AsUInt64();
+
+ CompressedBuffer Compressed;
+
+ if (const CbAttachment* Attachment = FromPackage.FindAttachment(ChunkHash))
+ {
+ Compressed = Attachment->AsCompressedBinary();
+ }
+ else
+ {
+ IoBuffer DataBuffer = m_ChunkResolver.FindChunkByCid(ChunkHash);
+
+ if (!DataBuffer)
+ {
+ throw std::runtime_error(fmt::format("worker chunk '{}' missing", ChunkHash));
+ }
+
+ uint64_t DataRawSize = 0;
+ IoHash DataRawHash;
+ Compressed = CompressedBuffer::FromCompressed(SharedBuffer{DataBuffer}, DataRawHash, DataRawSize);
+
+ if (DataRawSize != Size)
+ {
+ throw std::runtime_error(
+ fmt::format("worker chunk '{}' size: {}, action spec expected {}", ChunkHash, DataBuffer.Size(), Size));
+ }
+ }
+
+ ChunkReferenceCallback(ChunkHash, Compressed);
+
+ std::filesystem::path FilePath{SandboxRootPath / std::filesystem::path(Name).make_preferred()};
+
+ SharedBuffer Decompressed = Compressed.Decompress();
+ zen::WriteFile(FilePath, Decompressed.AsIoBuffer());
+}
+
+void
+LocalProcessRunner::ManifestWorker(const CbPackage& WorkerPackage,
+ const std::filesystem::path& SandboxPath,
+ std::function<void(const IoHash&, CompressedBuffer&)>&& ChunkReferenceCallback)
+{
+ CbObject WorkerDescription = WorkerPackage.GetObject();
+
+ // Manifest worker in Sandbox
+
+ for (auto& It : WorkerDescription["executables"sv])
+ {
+ DecompressAttachmentToFile(WorkerPackage, It.AsObjectView(), SandboxPath, ChunkReferenceCallback);
+ }
+
+ for (auto& It : WorkerDescription["dirs"sv])
+ {
+ std::string_view Name = It.AsString();
+ std::filesystem::path DirPath{SandboxPath / std::filesystem::path(Name).make_preferred()};
+ zen::CreateDirectories(DirPath);
+ }
+
+ for (auto& It : WorkerDescription["files"sv])
+ {
+ DecompressAttachmentToFile(WorkerPackage, It.AsObjectView(), SandboxPath, ChunkReferenceCallback);
+ }
+
+ WriteFile(SandboxPath / "worker.zcb", WorkerDescription.GetBuffer().AsIoBuffer());
+}
+
+CbPackage
+LocalProcessRunner::GatherActionOutputs(std::filesystem::path SandboxPath)
+{
+ std::filesystem::path OutputFile = SandboxPath / "build.output";
+ FileContents OutputData = zen::ReadFile(OutputFile);
+
+ if (OutputData.ErrorCode)
+ {
+ throw std::system_error(OutputData.ErrorCode, fmt::format("Failed to read build output file '{}'", OutputFile));
+ }
+
+ CbPackage OutputPackage;
+ CbObject Output = zen::LoadCompactBinaryObject(OutputData.Flatten());
+
+ uint64_t TotalAttachmentBytes = 0;
+ uint64_t TotalRawAttachmentBytes = 0;
+
+ Output.IterateAttachments([&](CbFieldView Field) {
+ IoHash Hash = Field.AsHash();
+ std::filesystem::path OutputPath{SandboxPath / "Outputs" / Hash.ToHexString()};
+ FileContents ChunkData = zen::ReadFile(OutputPath);
+
+ if (ChunkData.ErrorCode)
+ {
+ throw std::system_error(ChunkData.ErrorCode, fmt::format("Failed to read build output file '{}'", OutputPath));
+ }
+
+ uint64_t ChunkDataRawSize = 0;
+ IoHash ChunkDataHash;
+ CompressedBuffer AttachmentBuffer =
+ CompressedBuffer::FromCompressed(SharedBuffer(ChunkData.Flatten()), ChunkDataHash, ChunkDataRawSize);
+
+ if (!AttachmentBuffer)
+ {
+ throw std::runtime_error("Invalid output encountered (not valid CompressedBuffer format)");
+ }
+
+ TotalAttachmentBytes += AttachmentBuffer.GetCompressedSize();
+ TotalRawAttachmentBytes += ChunkDataRawSize;
+
+ CbAttachment Attachment(std::move(AttachmentBuffer), ChunkDataHash);
+ OutputPackage.AddAttachment(Attachment);
+ });
+
+ OutputPackage.SetObject(Output);
+
+ ZEN_DEBUG("Action completed with {} attachments ({} compressed, {} uncompressed)",
+ OutputPackage.GetAttachments().size(),
+ NiceBytes(TotalAttachmentBytes),
+ NiceBytes(TotalRawAttachmentBytes));
+
+ return OutputPackage;
+}
+
+void
+LocalProcessRunner::MonitorThreadFunction()
+{
+ SetCurrentThreadName("LocalProcessRunner_Monitor");
+
+ auto _ = MakeGuard([&] { ZEN_INFO("monitor thread exiting"); });
+
+ do
+ {
+ // On Windows it's possible to wait on process handles, so we wait for either a process to exit
+ // or for the monitor event to be signaled (which indicates we should check for cancellation
+ // or shutdown). This could be further improved by using a completion port and registering process
+ // handles with it, but this is a reasonable first implementation given that we shouldn't be dealing
+ // with an enormous number of concurrent processes.
+ //
+ // On other platforms we just wait on the monitor event and poll for process exits at intervals.
+# if ZEN_PLATFORM_WINDOWS
+ auto WaitOnce = [&] {
+ HANDLE WaitHandles[MAXIMUM_WAIT_OBJECTS];
+
+ uint32_t NumHandles = 0;
+
+ WaitHandles[NumHandles++] = m_MonitorThreadEvent.GetWindowsHandle();
+
+ m_RunningLock.WithSharedLock([&] {
+ for (auto It = begin(m_RunningMap), ItEnd = end(m_RunningMap); It != ItEnd && NumHandles < MAXIMUM_WAIT_OBJECTS; ++It)
+ {
+ Ref<RunningAction> Action = It->second;
+
+ WaitHandles[NumHandles++] = Action->ProcessHandle;
+ }
+ });
+
+ DWORD WaitResult = WaitForMultipleObjects(NumHandles, WaitHandles, FALSE, 1000);
+
+ // return true if a handle was signaled
+ return (WaitResult <= NumHandles);
+ };
+# else
+ auto WaitOnce = [&] { return m_MonitorThreadEvent.Wait(1000); };
+# endif
+
+ while (!WaitOnce())
+ {
+ if (m_MonitorThreadEnabled == false)
+ {
+ return;
+ }
+
+ SweepRunningActions();
+ }
+
+ // Signal received
+
+ SweepRunningActions();
+ } while (m_MonitorThreadEnabled);
+}
+
+void
+LocalProcessRunner::CancelRunningActions()
+{
+ Stopwatch Timer;
+ std::unordered_map<int, Ref<RunningAction>> RunningMap;
+
+ m_RunningLock.WithExclusiveLock([&] { std::swap(RunningMap, m_RunningMap); });
+
+ if (RunningMap.empty())
+ {
+ return;
+ }
+
+ ZEN_INFO("cancelling all running actions");
+
+ // For expedience we initiate the process termination for all known
+ // processes before attempting to wait for them to exit.
+
+ std::vector<int> TerminatedLsnList;
+
+ for (const auto& Kv : RunningMap)
+ {
+ Ref<RunningAction> Action = Kv.second;
+
+ // Terminate running process
+
+# if ZEN_PLATFORM_WINDOWS
+ BOOL Success = TerminateProcess(Action->ProcessHandle, 222);
+
+ if (Success)
+ {
+ TerminatedLsnList.push_back(Kv.first);
+ }
+ else
+ {
+ DWORD LastError = GetLastError();
+
+ if (LastError != ERROR_ACCESS_DENIED)
+ {
+ ZEN_WARN("TerminateProcess for LSN {} not successful: {}", Action->Action->ActionLsn, GetSystemErrorAsString(LastError));
+ }
+ }
+# else
+ ZEN_NOT_IMPLEMENTED("need to implement process termination");
+# endif
+ }
+
+ // We only post results for processes we have terminated, in order
+ // to avoid multiple results getting posted for the same action
+
+ for (int Lsn : TerminatedLsnList)
+ {
+ if (auto It = RunningMap.find(Lsn); It != RunningMap.end())
+ {
+ Ref<RunningAction> Running = It->second;
+
+# if ZEN_PLATFORM_WINDOWS
+ if (Running->ProcessHandle != INVALID_HANDLE_VALUE)
+ {
+ DWORD WaitResult = WaitForSingleObject(Running->ProcessHandle, 2000);
+
+ if (WaitResult != WAIT_OBJECT_0)
+ {
+ ZEN_WARN("wait for LSN {}: process exit did not succeed, result = {}", Running->Action->ActionLsn, WaitResult);
+ }
+ else
+ {
+ ZEN_DEBUG("LSN {}: process exit OK", Running->Action->ActionLsn);
+ }
+ }
+# endif
+
+ // Clean up and post error result
+
+ DeleteDirectories(Running->SandboxPath);
+ Running->Action->SetActionState(RunnerAction::State::Failed);
+ }
+ }
+
+ ZEN_INFO("DONE - cancelled {} running processes (took {})", TerminatedLsnList.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+}
+
+void
+LocalProcessRunner::SweepRunningActions()
+{
+ std::vector<Ref<RunningAction>> CompletedActions;
+
+ m_RunningLock.WithExclusiveLock([&] {
+ // TODO: It would be good to not hold the exclusive lock while making
+ // system calls and other expensive operations.
+
+ for (auto It = begin(m_RunningMap), ItEnd = end(m_RunningMap); It != ItEnd;)
+ {
+ Ref<RunningAction> Action = It->second;
+
+# if ZEN_PLATFORM_WINDOWS
+ DWORD ExitCode = 0;
+ BOOL IsSuccess = GetExitCodeProcess(Action->ProcessHandle, &ExitCode);
+
+ if (IsSuccess && ExitCode != STILL_ACTIVE)
+ {
+ CloseHandle(Action->ProcessHandle);
+ Action->ProcessHandle = INVALID_HANDLE_VALUE;
+
+ CompletedActions.push_back(std::move(Action));
+ It = m_RunningMap.erase(It);
+ }
+ else
+ {
+ ++It;
+ }
+# else
+ // TODO: implement properly for Mac/Linux
+
+ ZEN_UNUSED(Action);
+# endif
+ }
+ });
+
+ // Notify outer. Note that this has to be done without holding any local locks
+ // otherwise we may end up with deadlocks.
+
+ for (Ref<RunningAction> Running : CompletedActions)
+ {
+ const int ActionLsn = Running->Action->ActionLsn;
+
+ if (Running->ExitCode == 0)
+ {
+ try
+ {
+ // Gather outputs
+
+ CbPackage OutputPackage = GatherActionOutputs(Running->SandboxPath);
+
+ Running->Action->SetResult(std::move(OutputPackage));
+ Running->Action->SetActionState(RunnerAction::State::Completed);
+
+ // We can delete the files at this point
+ if (!DeleteDirectories(Running->SandboxPath))
+ {
+ ZEN_WARN("Unable to delete directory '{}', this will continue to exist until service restart", Running->SandboxPath);
+ }
+
+ // Success -- continue with next iteration of the loop
+ continue;
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_ERROR("Encountered failure while gathering outputs for action lsn {}, '{}'", ActionLsn, Ex.what());
+ }
+ }
+
+ // Failed - for now this is indicated with an empty package in
+ // the results map. We can clean out the sandbox directory immediately.
+
+ std::error_code Ec;
+ DeleteDirectories(Running->SandboxPath, Ec);
+
+ if (Ec)
+ {
+ ZEN_WARN("Unable to delete sandbox directory '{}': {}", Running->SandboxPath, Ec.message());
+ }
+
+ Running->Action->SetActionState(RunnerAction::State::Failed);
+ }
+}
+
+} // namespace zen::compute
+
+#endif
diff --git a/src/zencompute/localrunner.h b/src/zencompute/localrunner.h
new file mode 100644
index 000000000..35f464805
--- /dev/null
+++ b/src/zencompute/localrunner.h
@@ -0,0 +1,100 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "zencompute/functionservice.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include "functionrunner.h"
+
+# include <zencore/thread.h>
+# include <zencore/zencore.h>
+# include <zenstore/cidstore.h>
+# include <zencore/compactbinarypackage.h>
+# include <zencore/logging.h>
+
+# include <atomic>
+# include <filesystem>
+# include <thread>
+
+namespace zen {
+class CbPackage;
+}
+
+namespace zen::compute {
+
+/** Direct process spawner
+
+ This runner simply sets up a directory structure for each job and
+ creates a process to perform the computation in it. It is not very
+ efficient and is intended mostly for testing.
+
+ */
+
+class LocalProcessRunner : public FunctionRunner
+{
+ LocalProcessRunner(LocalProcessRunner&&) = delete;
+ LocalProcessRunner& operator=(LocalProcessRunner&&) = delete;
+
+public:
+ LocalProcessRunner(ChunkResolver& Resolver, const std::filesystem::path& BaseDir);
+ ~LocalProcessRunner();
+
+ virtual void Shutdown() override;
+ virtual void RegisterWorker(const CbPackage& WorkerPackage) override;
+ [[nodiscard]] virtual SubmitResult SubmitAction(Ref<RunnerAction> Action) override;
+ [[nodiscard]] virtual bool IsHealthy() override { return true; }
+ [[nodiscard]] virtual size_t GetSubmittedActionCount() override;
+ [[nodiscard]] virtual size_t QueryCapacity() override;
+ [[nodiscard]] virtual std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) override;
+
+protected:
+ LoggerRef Log() { return m_Log; }
+
+ LoggerRef m_Log;
+
+ struct RunningAction : public RefCounted
+ {
+ Ref<RunnerAction> Action;
+ void* ProcessHandle = nullptr;
+ int ExitCode = 0;
+ std::filesystem::path SandboxPath;
+ };
+
+ std::atomic_bool m_AcceptNewActions;
+ ChunkResolver& m_ChunkResolver;
+ RwLock m_WorkerLock;
+ std::filesystem::path m_WorkerPath;
+ std::atomic<int32_t> m_SandboxCounter = 0;
+ std::filesystem::path m_SandboxPath;
+ int32_t m_MaxRunningActions = 64; // arbitrary limit for testing
+
+ // if used in conjuction with m_ResultsLock, this lock must be taken *after*
+ // m_ResultsLock to avoid deadlocks
+ RwLock m_RunningLock;
+ std::unordered_map<int, Ref<RunningAction>> m_RunningMap;
+
+ std::thread m_MonitorThread;
+ std::atomic<bool> m_MonitorThreadEnabled{true};
+ Event m_MonitorThreadEvent;
+ void MonitorThreadFunction();
+ void SweepRunningActions();
+ void CancelRunningActions();
+
+ std::filesystem::path CreateNewSandbox();
+ void ManifestWorker(const CbPackage& WorkerPackage,
+ const std::filesystem::path& SandboxPath,
+ std::function<void(const IoHash&, CompressedBuffer&)>&& ChunkReferenceCallback);
+ std::filesystem::path ManifestWorker(const WorkerDesc& Worker);
+ CbPackage GatherActionOutputs(std::filesystem::path SandboxPath);
+
+ void DecompressAttachmentToFile(const CbPackage& FromPackage,
+ CbObjectView FileEntry,
+ const std::filesystem::path& SandboxRootPath,
+ std::function<void(const IoHash&, CompressedBuffer&)>& ChunkReferenceCallback);
+};
+
+} // namespace zen::compute
+
+#endif
diff --git a/src/zencompute/recordingreader.cpp b/src/zencompute/recordingreader.cpp
new file mode 100644
index 000000000..1c1a119cf
--- /dev/null
+++ b/src/zencompute/recordingreader.cpp
@@ -0,0 +1,335 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "zencompute/recordingreader.h"
+
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinaryfile.h>
+#include <zencore/compactbinaryvalue.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+
+#if ZEN_PLATFORM_WINDOWS
+# include <ppl.h>
+# define ZEN_CONCRT_AVAILABLE 1
+#else
+# define ZEN_CONCRT_AVAILABLE 0
+#endif
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+namespace zen::compute {
+
+using namespace std::literals;
+
+//////////////////////////////////////////////////////////////////////////
+
+# if ZEN_PLATFORM_WINDOWS
+# define ZEN_BUILD_ACTION L"Build.action"
+# define ZEN_WORKER_UCB L"worker.ucb"
+# else
+# define ZEN_BUILD_ACTION "Build.action"
+# define ZEN_WORKER_UCB "worker.ucb"
+# endif
+
+//////////////////////////////////////////////////////////////////////////
+
+struct RecordingTreeVisitor : public FileSystemTraversal::TreeVisitor
+{
+ virtual void VisitFile(const std::filesystem::path& Parent,
+ const path_view& File,
+ uint64_t FileSize,
+ uint32_t NativeModeOrAttributes,
+ uint64_t NativeModificationTick)
+ {
+ ZEN_UNUSED(Parent, File, FileSize, NativeModeOrAttributes, NativeModificationTick);
+
+ if (File.compare(path_view(ZEN_BUILD_ACTION)) == 0)
+ {
+ WorkDirs.push_back(Parent);
+ }
+ else if (File.compare(path_view(ZEN_WORKER_UCB)) == 0)
+ {
+ WorkerDirs.push_back(Parent);
+ }
+ }
+
+ virtual bool VisitDirectory(const std::filesystem::path& Parent, const path_view& DirectoryName, uint32_t NativeModeOrAttributes)
+ {
+ ZEN_UNUSED(Parent, DirectoryName, NativeModeOrAttributes);
+
+ return true;
+ }
+
+ std::vector<std::filesystem::path> WorkerDirs;
+ std::vector<std::filesystem::path> WorkDirs;
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+void
+IterateOverArray(auto Array, auto Func, int TargetParallelism)
+{
+# if ZEN_CONCRT_AVAILABLE
+ if (TargetParallelism > 1)
+ {
+ concurrency::simple_partitioner Chunker(Array.size() / TargetParallelism);
+ concurrency::parallel_for_each(begin(Array), end(Array), [&](const auto& Item) { Func(Item); });
+
+ return;
+ }
+# else
+ ZEN_UNUSED(TargetParallelism);
+# endif
+
+ for (const auto& Item : Array)
+ {
+ Func(Item);
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+RecordingReaderBase::~RecordingReaderBase() = default;
+
+//////////////////////////////////////////////////////////////////////////
+
+RecordingReader::RecordingReader(const std::filesystem::path& RecordingPath) : m_RecordingLogDir(RecordingPath)
+{
+ CidStoreConfiguration CidConfig;
+ CidConfig.RootDirectory = m_RecordingLogDir / "cid";
+ CidConfig.HugeValueThreshold = 128 * 1024 * 1024;
+
+ m_CidStore.Initialize(CidConfig);
+}
+
+RecordingReader::~RecordingReader()
+{
+ m_CidStore.Flush();
+}
+
+size_t
+RecordingReader::GetActionCount() const
+{
+ return m_Actions.size();
+}
+
+IoBuffer
+RecordingReader::FindChunkByCid(const IoHash& DecompressedId)
+{
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(DecompressedId))
+ {
+ return Chunk;
+ }
+
+ ZEN_ERROR("failed lookup of chunk with CID '{}'", DecompressedId);
+
+ return {};
+}
+
+std::unordered_map<zen::IoHash, zen::CbPackage>
+RecordingReader::ReadWorkers()
+{
+ std::unordered_map<zen::IoHash, zen::CbPackage> WorkerMap;
+
+ {
+ CbObjectFromFile TocFile = LoadCompactBinaryObject(m_RecordingLogDir / "workers.ztoc");
+ CbObject Toc = TocFile.Object;
+
+ m_WorkerDataFile.Open(m_RecordingLogDir / "workers.zdat", BasicFile::Mode::kRead);
+
+ ZEN_ASSERT(Toc["version"sv].AsInt32() == 1);
+
+ for (auto& It : Toc["toc"])
+ {
+ CbArrayView Entry = It.AsArrayView();
+ CbFieldViewIterator Vit = Entry.CreateViewIterator();
+
+ const IoHash WorkerId = Vit++->AsHash();
+ const uint64_t Offset = Vit++->AsInt64(0);
+ const uint64_t Size = Vit++->AsInt64(0);
+
+ IoBuffer WorkerRange = m_WorkerDataFile.ReadRange(Offset, Size);
+ CbObject WorkerDesc = LoadCompactBinaryObject(WorkerRange);
+ CbPackage& WorkerPkg = WorkerMap[WorkerId];
+ WorkerPkg.SetObject(WorkerDesc);
+
+ WorkerDesc.IterateAttachments([&](const zen::CbFieldView AttachmentField) {
+ const IoHash AttachmentCid = AttachmentField.GetValue().AsHash();
+ IoBuffer AttachmentData = m_CidStore.FindChunkByCid(AttachmentCid);
+
+ if (AttachmentData)
+ {
+ IoHash RawHash;
+ uint64_t RawSize = 0;
+ CompressedBuffer CompressedData = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentData), RawHash, RawSize);
+ WorkerPkg.AddAttachment(CbAttachment(CompressedData, RawHash));
+ }
+ });
+ }
+ }
+
+ // Scan actions as well (this should be called separately, ideally)
+
+ ScanActions();
+
+ return WorkerMap;
+}
+
+void
+RecordingReader::ScanActions()
+{
+ CbObjectFromFile TocFile = LoadCompactBinaryObject(m_RecordingLogDir / "actions.ztoc");
+ CbObject Toc = TocFile.Object;
+
+ m_ActionDataFile.Open(m_RecordingLogDir / "actions.zdat", BasicFile::Mode::kRead);
+
+ ZEN_ASSERT(Toc["version"sv].AsInt32() == 1);
+
+ for (auto& It : Toc["toc"])
+ {
+ CbArrayView ArrayEntry = It.AsArrayView();
+ CbFieldViewIterator Vit = ArrayEntry.CreateViewIterator();
+
+ ActionEntry Entry;
+ Entry.ActionId = Vit++->AsHash();
+ Entry.Offset = Vit++->AsInt64(0);
+ Entry.Size = Vit++->AsInt64(0);
+
+ m_Actions.push_back(Entry);
+ }
+}
+
+void
+RecordingReader::IterateActions(std::function<void(CbObject ActionObject, const IoHash& ActionId)>&& Callback, int TargetParallelism)
+{
+ IterateOverArray(
+ m_Actions,
+ [&](const ActionEntry& Entry) {
+ CbObject ActionDesc = LoadCompactBinaryObject(m_ActionDataFile.ReadRange(Entry.Offset, Entry.Size));
+
+ Callback(ActionDesc, Entry.ActionId);
+ },
+ TargetParallelism);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+IoBuffer
+LocalResolver::FindChunkByCid(const IoHash& DecompressedId)
+{
+ RwLock::SharedLockScope _(MapLock);
+ if (auto It = Attachments.find(DecompressedId); It != Attachments.end())
+ {
+ return It->second;
+ }
+
+ return {};
+}
+
+void
+LocalResolver::Add(const IoHash& Cid, IoBuffer Data)
+{
+ RwLock::ExclusiveLockScope _(MapLock);
+ Data.SetContentType(ZenContentType::kCompressedBinary);
+ Attachments[Cid] = Data;
+}
+
+///
+
+UeRecordingReader::UeRecordingReader(const std::filesystem::path& RecordingPath) : m_RecordingDir(RecordingPath)
+{
+}
+
+UeRecordingReader::~UeRecordingReader()
+{
+}
+
+size_t
+UeRecordingReader::GetActionCount() const
+{
+ return m_WorkDirs.size();
+}
+
+IoBuffer
+UeRecordingReader::FindChunkByCid(const IoHash& DecompressedId)
+{
+ return m_LocalResolver.FindChunkByCid(DecompressedId);
+}
+
+std::unordered_map<zen::IoHash, zen::CbPackage>
+UeRecordingReader::ReadWorkers()
+{
+ std::unordered_map<zen::IoHash, zen::CbPackage> WorkerMap;
+
+ FileSystemTraversal Traversal;
+ RecordingTreeVisitor Visitor;
+ Traversal.TraverseFileSystem(m_RecordingDir, Visitor);
+
+ m_WorkDirs = std::move(Visitor.WorkDirs);
+
+ for (const std::filesystem::path& WorkerDir : Visitor.WorkerDirs)
+ {
+ CbObjectFromFile WorkerFile = LoadCompactBinaryObject(WorkerDir / "worker.ucb");
+ CbObject WorkerDesc = WorkerFile.Object;
+ const IoHash& WorkerId = WorkerFile.Hash;
+ CbPackage& WorkerPkg = WorkerMap[WorkerId];
+ WorkerPkg.SetObject(WorkerDesc);
+
+ WorkerDesc.IterateAttachments([&](const zen::CbFieldView AttachmentField) {
+ const IoHash AttachmentCid = AttachmentField.GetValue().AsHash();
+ IoBuffer AttachmentData = ReadFile(WorkerDir / "chunks" / AttachmentCid.ToHexString()).Flatten();
+ IoHash RawHash;
+ uint64_t RawSize = 0;
+ CompressedBuffer CompressedData = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentData), RawHash, RawSize);
+ WorkerPkg.AddAttachment(CbAttachment(CompressedData, RawHash));
+ });
+ }
+
+ return WorkerMap;
+}
+
+void
+UeRecordingReader::IterateActions(std::function<void(CbObject ActionObject, const IoHash& ActionId)>&& Callback, int ParallelismTarget)
+{
+ IterateOverArray(
+ m_WorkDirs,
+ [&](const std::filesystem::path& WorkDir) {
+ CbPackage WorkPackage = ReadAction(WorkDir);
+ CbObject ActionObject = WorkPackage.GetObject();
+ const IoHash& ActionId = WorkPackage.GetObjectHash();
+
+ Callback(ActionObject, ActionId);
+ },
+ ParallelismTarget);
+}
+
+CbPackage
+UeRecordingReader::ReadAction(std::filesystem::path WorkDir)
+{
+ CbPackage WorkPackage;
+ std::filesystem::path WorkDescPath = WorkDir / "Build.action";
+ CbObjectFromFile ActionFile = LoadCompactBinaryObject(WorkDescPath);
+ CbObject& ActionObject = ActionFile.Object;
+
+ WorkPackage.SetObject(ActionObject);
+
+ ActionObject.IterateAttachments([&](const zen::CbFieldView AttachmentField) {
+ const IoHash AttachmentCid = AttachmentField.GetValue().AsHash();
+ IoBuffer AttachmentData = ReadFile(WorkDir / "inputs" / AttachmentCid.ToHexString()).Flatten();
+
+ m_LocalResolver.Add(AttachmentCid, AttachmentData);
+
+ IoHash RawHash;
+ uint64_t RawSize = 0;
+ CompressedBuffer CompressedData = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentData), RawHash, RawSize);
+ ZEN_ASSERT(AttachmentCid == RawHash);
+ WorkPackage.AddAttachment(CbAttachment(CompressedData, RawHash));
+ });
+
+ return WorkPackage;
+}
+
+} // namespace zen::compute
+
+#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/src/zencompute/remotehttprunner.cpp b/src/zencompute/remotehttprunner.cpp
new file mode 100644
index 000000000..98ced5fe8
--- /dev/null
+++ b/src/zencompute/remotehttprunner.cpp
@@ -0,0 +1,457 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "remotehttprunner.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include <zencore/compactbinary.h>
+# include <zencore/compactbinarybuilder.h>
+# include <zencore/compactbinarypackage.h>
+# include <zencore/compress.h>
+# include <zencore/except.h>
+# include <zencore/filesystem.h>
+# include <zencore/fmtutils.h>
+# include <zencore/iobuffer.h>
+# include <zencore/iohash.h>
+# include <zencore/scopeguard.h>
+# include <zenhttp/httpcommon.h>
+# include <zenstore/cidstore.h>
+
+# include <span>
+
+//////////////////////////////////////////////////////////////////////////
+
+namespace zen::compute {
+
+using namespace std::literals;
+
+//////////////////////////////////////////////////////////////////////////
+
+RemoteHttpRunner::RemoteHttpRunner(ChunkResolver& InChunkResolver, const std::filesystem::path& BaseDir, std::string_view HostName)
+: FunctionRunner(BaseDir)
+, m_Log(logging::Get("http_exec"))
+, m_ChunkResolver{InChunkResolver}
+, m_BaseUrl{fmt::format("{}/apply", HostName)}
+, m_Http(m_BaseUrl)
+{
+ m_MonitorThread = std::thread{&RemoteHttpRunner::MonitorThreadFunction, this};
+}
+
+RemoteHttpRunner::~RemoteHttpRunner()
+{
+ Shutdown();
+}
+
+void
+RemoteHttpRunner::Shutdown()
+{
+ // TODO: should cleanly drain/cancel pending work
+
+ m_MonitorThreadEnabled = false;
+ m_MonitorThreadEvent.Set();
+ if (m_MonitorThread.joinable())
+ {
+ m_MonitorThread.join();
+ }
+}
+
+void
+RemoteHttpRunner::RegisterWorker(const CbPackage& WorkerPackage)
+{
+ const IoHash WorkerId = WorkerPackage.GetObjectHash();
+ CbPackage WorkerDesc = WorkerPackage;
+
+ std::string WorkerUrl = fmt::format("/workers/{}", WorkerId);
+
+ HttpClient::Response WorkerResponse = m_Http.Get(WorkerUrl);
+
+ if (WorkerResponse.StatusCode == HttpResponseCode::NotFound)
+ {
+ HttpClient::Response DescResponse = m_Http.Post(WorkerUrl, WorkerDesc.GetObject());
+
+ if (DescResponse.StatusCode == HttpResponseCode::NotFound)
+ {
+ CbPackage Pkg = WorkerDesc;
+
+ // Build response package by sending only the attachments
+ // the other end needs. We start with the full package and
+ // remove the attachments which are not needed.
+
+ {
+ std::unordered_set<IoHash> Needed;
+
+ CbObject Response = DescResponse.AsObject();
+
+ for (auto& Item : Response["need"sv])
+ {
+ const IoHash NeedHash = Item.AsHash();
+
+ Needed.insert(NeedHash);
+ }
+
+ std::unordered_set<IoHash> ToRemove;
+
+ for (const CbAttachment& Attachment : Pkg.GetAttachments())
+ {
+ const IoHash& Hash = Attachment.GetHash();
+
+ if (Needed.find(Hash) == Needed.end())
+ {
+ ToRemove.insert(Hash);
+ }
+ }
+
+ for (const IoHash& Hash : ToRemove)
+ {
+ int RemovedCount = Pkg.RemoveAttachment(Hash);
+
+ ZEN_ASSERT(RemovedCount == 1);
+ }
+ }
+
+ // Post resulting package
+
+ HttpClient::Response PayloadResponse = m_Http.Post(WorkerUrl, Pkg);
+
+ if (!IsHttpSuccessCode(PayloadResponse.StatusCode))
+ {
+ ZEN_ERROR("ERROR: unable to register payloads for worker {} at {}{}", WorkerId, m_Http.GetBaseUri(), WorkerUrl);
+
+ // TODO: propagate error
+ }
+ }
+ else if (!IsHttpSuccessCode(DescResponse.StatusCode))
+ {
+ ZEN_ERROR("ERROR: unable to register worker {} at {}{}", WorkerId, m_Http.GetBaseUri(), WorkerUrl);
+
+ // TODO: propagate error
+ }
+ else
+ {
+ ZEN_ASSERT(DescResponse.StatusCode == HttpResponseCode::NoContent);
+ }
+ }
+ else if (WorkerResponse.StatusCode == HttpResponseCode::OK)
+ {
+ // Already known from a previous run
+ }
+ else if (!IsHttpSuccessCode(WorkerResponse.StatusCode))
+ {
+ ZEN_ERROR("ERROR: unable to look up worker {} at {}{} (error: {} {})",
+ WorkerId,
+ m_Http.GetBaseUri(),
+ WorkerUrl,
+ (int)WorkerResponse.StatusCode,
+ ToString(WorkerResponse.StatusCode));
+
+ // TODO: propagate error
+ }
+}
+
+size_t
+RemoteHttpRunner::QueryCapacity()
+{
+ // Estimate how much more work we're ready to accept
+
+ RwLock::SharedLockScope _{m_RunningLock};
+
+ size_t RunningCount = m_RemoteRunningMap.size();
+
+ if (RunningCount >= size_t(m_MaxRunningActions))
+ {
+ return 0;
+ }
+
+ return m_MaxRunningActions - RunningCount;
+}
+
+std::vector<SubmitResult>
+RemoteHttpRunner::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
+{
+ std::vector<SubmitResult> Results;
+
+ for (const Ref<RunnerAction>& Action : Actions)
+ {
+ Results.push_back(SubmitAction(Action));
+ }
+
+ return Results;
+}
+
+SubmitResult
+RemoteHttpRunner::SubmitAction(Ref<RunnerAction> Action)
+{
+ // Verify whether we can accept more work
+
+ {
+ RwLock::SharedLockScope _{m_RunningLock};
+ if (m_RemoteRunningMap.size() >= size_t(m_MaxRunningActions))
+ {
+ return SubmitResult{.IsAccepted = false};
+ }
+ }
+
+ using namespace std::literals;
+
+ // Each enqueued action is assigned an integer index (logical sequence number),
+ // which we use as a key for tracking data structures and as an opaque id which
+ // may be used by clients to reference the scheduled action
+
+ const int32_t ActionLsn = Action->ActionLsn;
+ const CbObject& ActionObj = Action->ActionObj;
+ const IoHash ActionId = ActionObj.GetHash();
+
+ MaybeDumpAction(ActionLsn, ActionObj);
+
+ // Enqueue job
+
+ CbObject Result;
+
+ HttpClient::Response WorkResponse = m_Http.Post("/jobs", ActionObj);
+ HttpResponseCode WorkResponseCode = WorkResponse.StatusCode;
+
+ if (WorkResponseCode == HttpResponseCode::OK)
+ {
+ Result = WorkResponse.AsObject();
+ }
+ else if (WorkResponseCode == HttpResponseCode::NotFound)
+ {
+ // Not all attachments are present
+
+ // Build response package including all required attachments
+
+ CbPackage Pkg;
+ Pkg.SetObject(ActionObj);
+
+ CbObject Response = WorkResponse.AsObject();
+
+ for (auto& Item : Response["need"sv])
+ {
+ const IoHash NeedHash = Item.AsHash();
+
+ if (IoBuffer Chunk = m_ChunkResolver.FindChunkByCid(NeedHash))
+ {
+ uint64_t DataRawSize = 0;
+ IoHash DataRawHash;
+ CompressedBuffer Compressed =
+ CompressedBuffer::FromCompressed(SharedBuffer{Chunk}, /* out */ DataRawHash, /* out */ DataRawSize);
+
+ ZEN_ASSERT(DataRawHash == NeedHash);
+
+ Pkg.AddAttachment(CbAttachment(Compressed, NeedHash));
+ }
+ else
+ {
+ // No such attachment
+
+ return {.IsAccepted = false, .Reason = fmt::format("missing attachment {}", NeedHash)};
+ }
+ }
+
+ // Post resulting package
+
+ HttpClient::Response PayloadResponse = m_Http.Post("/jobs", Pkg);
+
+ if (!PayloadResponse)
+ {
+ ZEN_WARN("unable to register payloads for action {} at {}/jobs", ActionId, m_Http.GetBaseUri());
+
+ // TODO: include more information about the failure in the response
+
+ return {.IsAccepted = false, .Reason = "HTTP request failed"};
+ }
+ else if (PayloadResponse.StatusCode == HttpResponseCode::OK)
+ {
+ Result = PayloadResponse.AsObject();
+ }
+ else
+ {
+ // Unexpected response
+
+ const int ResponseStatusCode = (int)PayloadResponse.StatusCode;
+
+ ZEN_WARN("unable to register payloads for action {} at {}/jobs (error: {} {})",
+ ActionId,
+ m_Http.GetBaseUri(),
+ ResponseStatusCode,
+ ToString(ResponseStatusCode));
+
+ return {.IsAccepted = false,
+ .Reason = fmt::format("unexpected response code {} {} from {}/jobs",
+ ResponseStatusCode,
+ ToString(ResponseStatusCode),
+ m_Http.GetBaseUri())};
+ }
+ }
+
+ if (Result)
+ {
+ if (const int32_t LsnField = Result["lsn"].AsInt32(0))
+ {
+ HttpRunningAction NewAction;
+ NewAction.Action = Action;
+ NewAction.RemoteActionLsn = LsnField;
+
+ {
+ RwLock::ExclusiveLockScope _(m_RunningLock);
+
+ m_RemoteRunningMap[LsnField] = std::move(NewAction);
+ }
+
+ ZEN_DEBUG("scheduled action {} with remote LSN {} (local LSN {})", ActionId, LsnField, ActionLsn);
+
+ Action->SetActionState(RunnerAction::State::Running);
+
+ return SubmitResult{.IsAccepted = true};
+ }
+ }
+
+ return {};
+}
+
+bool
+RemoteHttpRunner::IsHealthy()
+{
+ if (HttpClient::Response Ready = m_Http.Get("/ready"))
+ {
+ return true;
+ }
+ else
+ {
+ // TODO: use response to propagate context
+ return false;
+ }
+}
+
+size_t
+RemoteHttpRunner::GetSubmittedActionCount()
+{
+ RwLock::SharedLockScope _(m_RunningLock);
+ return m_RemoteRunningMap.size();
+}
+
+void
+RemoteHttpRunner::MonitorThreadFunction()
+{
+ SetCurrentThreadName("RemoteHttpRunner_Monitor");
+
+ do
+ {
+ const int NormalWaitingTime = 1000;
+ int WaitTimeMs = NormalWaitingTime;
+ auto WaitOnce = [&] { return m_MonitorThreadEvent.Wait(WaitTimeMs); };
+ auto SweepOnce = [&] {
+ const size_t RetiredCount = SweepRunningActions();
+
+ m_RunningLock.WithSharedLock([&] {
+ if (m_RemoteRunningMap.size() > 16)
+ {
+ WaitTimeMs = NormalWaitingTime / 4;
+ }
+ else
+ {
+ if (RetiredCount)
+ {
+ WaitTimeMs = NormalWaitingTime / 2;
+ }
+ else
+ {
+ WaitTimeMs = NormalWaitingTime;
+ }
+ }
+ });
+ };
+
+ while (!WaitOnce())
+ {
+ SweepOnce();
+ }
+
+ // Signal received - this may mean we should quit
+
+ SweepOnce();
+ } while (m_MonitorThreadEnabled);
+}
+
+size_t
+RemoteHttpRunner::SweepRunningActions()
+{
+ std::vector<HttpRunningAction> CompletedActions;
+
+ // Poll remote for list of completed actions
+
+ HttpClient::Response ResponseCompleted = m_Http.Get("/jobs/completed"sv);
+
+ if (CbObject Completed = ResponseCompleted.AsObject())
+ {
+ for (auto& FieldIt : Completed["completed"sv])
+ {
+ const int32_t CompleteLsn = FieldIt.AsInt32();
+
+ if (HttpClient::Response ResponseJob = m_Http.Get(fmt::format("/jobs/{}"sv, CompleteLsn)))
+ {
+ m_RunningLock.WithExclusiveLock([&] {
+ if (auto CompleteIt = m_RemoteRunningMap.find(CompleteLsn); CompleteIt != m_RemoteRunningMap.end())
+ {
+ HttpRunningAction CompletedAction = std::move(CompleteIt->second);
+ CompletedAction.ActionResults = ResponseJob.AsPackage();
+ CompletedAction.Success = true;
+
+ CompletedActions.push_back(std::move(CompletedAction));
+ m_RemoteRunningMap.erase(CompleteIt);
+ }
+ else
+ {
+ // we received a completion notice for an action we don't know about,
+ // this can happen if the runner is used by multiple upstream schedulers,
+ // or if this compute node was recently restarted and lost track of
+ // previously scheduled actions
+ }
+ });
+ }
+ }
+
+ if (CbObjectView Metrics = Completed["metrics"sv].AsObjectView())
+ {
+ // if (const size_t CpuCount = Metrics["core_count"].AsInt32(0))
+ if (const int32_t CpuCount = Metrics["lp_count"].AsInt32(0))
+ {
+ const int32_t NewCap = zen::Max(4, CpuCount);
+
+ if (m_MaxRunningActions > NewCap)
+ {
+ ZEN_DEBUG("capping {} to {} actions (was {})", m_BaseUrl, NewCap, m_MaxRunningActions);
+
+ m_MaxRunningActions = NewCap;
+ }
+ }
+ }
+ }
+
+ // Notify outer. Note that this has to be done without holding any local locks
+ // otherwise we may end up with deadlocks.
+
+ for (HttpRunningAction& HttpAction : CompletedActions)
+ {
+ const int ActionLsn = HttpAction.Action->ActionLsn;
+
+ if (HttpAction.Success)
+ {
+ ZEN_DEBUG("completed: {} LSN {} (remote LSN {})", HttpAction.Action->ActionId, ActionLsn, HttpAction.RemoteActionLsn);
+
+ HttpAction.Action->SetActionState(RunnerAction::State::Completed);
+
+ HttpAction.Action->SetResult(std::move(HttpAction.ActionResults));
+ }
+ else
+ {
+ HttpAction.Action->SetActionState(RunnerAction::State::Failed);
+ }
+ }
+
+ return CompletedActions.size();
+}
+
+} // namespace zen::compute
+
+#endif
diff --git a/src/zencompute/remotehttprunner.h b/src/zencompute/remotehttprunner.h
new file mode 100644
index 000000000..1e885da3d
--- /dev/null
+++ b/src/zencompute/remotehttprunner.h
@@ -0,0 +1,80 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "zencompute/functionservice.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include "functionrunner.h"
+
+# include <zencore/compactbinarypackage.h>
+# include <zencore/logging.h>
+# include <zencore/zencore.h>
+# include <zenhttp/httpclient.h>
+
+# include <atomic>
+# include <filesystem>
+# include <thread>
+
+namespace zen {
+class CidStore;
+}
+
+namespace zen::compute {
+
+/** HTTP-based runner
+
+ This implements a DDC remote compute execution strategy via REST API
+
+ */
+
+class RemoteHttpRunner : public FunctionRunner
+{
+ RemoteHttpRunner(RemoteHttpRunner&&) = delete;
+ RemoteHttpRunner& operator=(RemoteHttpRunner&&) = delete;
+
+public:
+ RemoteHttpRunner(ChunkResolver& InChunkResolver, const std::filesystem::path& BaseDir, std::string_view HostName);
+ ~RemoteHttpRunner();
+
+ virtual void Shutdown() override;
+ virtual void RegisterWorker(const CbPackage& WorkerPackage) override;
+ [[nodiscard]] virtual SubmitResult SubmitAction(Ref<RunnerAction> Action) override;
+ [[nodiscard]] virtual bool IsHealthy() override;
+ [[nodiscard]] virtual size_t GetSubmittedActionCount() override;
+ [[nodiscard]] virtual size_t QueryCapacity() override;
+ [[nodiscard]] virtual std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) override;
+
+protected:
+ LoggerRef Log() { return m_Log; }
+
+private:
+ LoggerRef m_Log;
+ ChunkResolver& m_ChunkResolver;
+ std::string m_BaseUrl;
+ HttpClient m_Http;
+
+ int32_t m_MaxRunningActions = 256; // arbitrary limit for testing
+
+ struct HttpRunningAction
+ {
+ Ref<RunnerAction> Action;
+ int RemoteActionLsn = 0; // Remote LSN
+ bool Success = false;
+ CbPackage ActionResults;
+ };
+
+ RwLock m_RunningLock;
+ std::unordered_map<int, HttpRunningAction> m_RemoteRunningMap; // Note that this is keyed on the *REMOTE* lsn
+
+ std::thread m_MonitorThread;
+ std::atomic<bool> m_MonitorThreadEnabled{true};
+ Event m_MonitorThreadEvent;
+ void MonitorThreadFunction();
+ size_t SweepRunningActions();
+};
+
+} // namespace zen::compute
+
+#endif
diff --git a/src/zencompute/xmake.lua b/src/zencompute/xmake.lua
new file mode 100644
index 000000000..c710b662d
--- /dev/null
+++ b/src/zencompute/xmake.lua
@@ -0,0 +1,11 @@
+-- Copyright Epic Games, Inc. All Rights Reserved.
+
+target('zencompute')
+ set_kind("static")
+ set_group("libs")
+ add_headerfiles("**.h")
+ add_files("**.cpp")
+ add_includedirs("include", {public=true})
+ add_deps("zencore", "zenstore", "zenutil", "zennet", "zenhttp")
+ add_packages("vcpkg::gsl-lite")
+ add_packages("vcpkg::spdlog", "vcpkg::cxxopts")
diff --git a/src/zencompute/zencompute.cpp b/src/zencompute/zencompute.cpp
new file mode 100644
index 000000000..633250f4e
--- /dev/null
+++ b/src/zencompute/zencompute.cpp
@@ -0,0 +1,12 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "zencompute/zencompute.h"
+
+namespace zen {
+
+void
+zencompute_forcelinktests()
+{
+}
+
+} // namespace zen