diff options
| author | zousar <[email protected]> | 2026-02-18 23:19:14 -0700 |
|---|---|---|
| committer | zousar <[email protected]> | 2026-02-18 23:19:14 -0700 |
| commit | 2ba28acaf034722452f82cfb07afc0a4bb90eeab (patch) | |
| tree | c00dea385597180673be6e02aca6c07d9ef6ec00 /src/zencompute | |
| parent | updatefrontend (diff) | |
| parent | structured compute basics (#714) (diff) | |
| download | zen-2ba28acaf034722452f82cfb07afc0a4bb90eeab.tar.xz zen-2ba28acaf034722452f82cfb07afc0a4bb90eeab.zip | |
Merge branch 'main' into zs/web-ui-improvements
Diffstat (limited to 'src/zencompute')
| -rw-r--r-- | src/zencompute/actionrecorder.cpp | 258 | ||||
| -rw-r--r-- | src/zencompute/actionrecorder.h | 91 | ||||
| -rw-r--r-- | src/zencompute/functionrunner.cpp | 112 | ||||
| -rw-r--r-- | src/zencompute/functionrunner.h | 207 | ||||
| -rw-r--r-- | src/zencompute/functionservice.cpp | 957 | ||||
| -rw-r--r-- | src/zencompute/httpfunctionservice.cpp | 709 | ||||
| -rw-r--r-- | src/zencompute/httporchestrator.cpp | 81 | ||||
| -rw-r--r-- | src/zencompute/include/zencompute/functionservice.h | 132 | ||||
| -rw-r--r-- | src/zencompute/include/zencompute/httpfunctionservice.h | 73 | ||||
| -rw-r--r-- | src/zencompute/include/zencompute/httporchestrator.h | 44 | ||||
| -rw-r--r-- | src/zencompute/include/zencompute/recordingreader.h | 127 | ||||
| -rw-r--r-- | src/zencompute/include/zencompute/zencompute.h | 11 | ||||
| -rw-r--r-- | src/zencompute/localrunner.cpp | 722 | ||||
| -rw-r--r-- | src/zencompute/localrunner.h | 100 | ||||
| -rw-r--r-- | src/zencompute/recordingreader.cpp | 335 | ||||
| -rw-r--r-- | src/zencompute/remotehttprunner.cpp | 457 | ||||
| -rw-r--r-- | src/zencompute/remotehttprunner.h | 80 | ||||
| -rw-r--r-- | src/zencompute/xmake.lua | 11 | ||||
| -rw-r--r-- | src/zencompute/zencompute.cpp | 12 |
19 files changed, 4519 insertions, 0 deletions
diff --git a/src/zencompute/actionrecorder.cpp b/src/zencompute/actionrecorder.cpp new file mode 100644 index 000000000..04c4b5141 --- /dev/null +++ b/src/zencompute/actionrecorder.cpp @@ -0,0 +1,258 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "actionrecorder.h" + +#include "functionrunner.h" + +#include <zencore/compactbinary.h> +#include <zencore/compactbinaryfile.h> +#include <zencore/compactbinaryvalue.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> + +#if ZEN_PLATFORM_WINDOWS +# include <ppl.h> +# define ZEN_CONCRT_AVAILABLE 1 +#else +# define ZEN_CONCRT_AVAILABLE 0 +#endif + +#if ZEN_WITH_COMPUTE_SERVICES + +namespace zen::compute { + +using namespace std::literals; + +////////////////////////////////////////////////////////////////////////// + +RecordingFileWriter::RecordingFileWriter() +{ +} + +RecordingFileWriter::~RecordingFileWriter() +{ + Close(); +} + +void +RecordingFileWriter::Open(std::filesystem::path FilePath) +{ + using namespace std::literals; + + m_File.Open(FilePath, BasicFile::Mode::kTruncate); + m_File.Write("----DDC2----DATA", 16, 0); + m_FileOffset = 16; + + std::filesystem::path TocPath = FilePath.replace_extension(".ztoc"); + m_TocFile.Open(TocPath, BasicFile::Mode::kTruncate); + + m_TocWriter << "version"sv << 1; + m_TocWriter.BeginArray("toc"sv); +} + +void +RecordingFileWriter::Close() +{ + m_TocWriter.EndArray(); + CbObject Toc = m_TocWriter.Save(); + + std::error_code Ec; + m_TocFile.WriteAll(Toc.GetBuffer().AsIoBuffer(), Ec); +} + +void +RecordingFileWriter::AppendObject(const CbObject& Object, const IoHash& ObjectHash) +{ + RwLock::ExclusiveLockScope _(m_FileLock); + + MemoryView ObjectView = Object.GetBuffer().GetView(); + + std::error_code Ec; + m_File.Write(ObjectView, m_FileOffset, Ec); + + if (Ec) + { + throw std::system_error(Ec, "failed writing to archive"); + } + + m_TocWriter.BeginArray(); + m_TocWriter.AddHash(ObjectHash); + m_TocWriter.AddInteger(m_FileOffset); + m_TocWriter.AddInteger(gsl::narrow<int>(ObjectView.GetSize())); + m_TocWriter.EndArray(); + + m_FileOffset += ObjectView.GetSize(); +} + +////////////////////////////////////////////////////////////////////////// + +ActionRecorder::ActionRecorder(ChunkResolver& InChunkResolver, const std::filesystem::path& RecordingLogPath) +: m_ChunkResolver(InChunkResolver) +, m_RecordingLogDir(RecordingLogPath) +{ + std::error_code Ec; + CreateDirectories(m_RecordingLogDir, Ec); + + if (Ec) + { + ZEN_WARN("Could not create directory '{}': {}", m_RecordingLogDir, Ec.message()); + } + + CleanDirectory(m_RecordingLogDir, /* ForceRemoveReadOnlyFiles */ true, Ec); + + if (Ec) + { + ZEN_WARN("Could not clean directory '{}': {}", m_RecordingLogDir, Ec.message()); + } + + m_WorkersFile.Open(m_RecordingLogDir / "workers.zdat"); + m_ActionsFile.Open(m_RecordingLogDir / "actions.zdat"); + + CidStoreConfiguration CidConfig; + CidConfig.RootDirectory = m_RecordingLogDir / "cid"; + CidConfig.HugeValueThreshold = 128 * 1024 * 1024; + + m_CidStore.Initialize(CidConfig); +} + +ActionRecorder::~ActionRecorder() +{ + Shutdown(); +} + +void +ActionRecorder::Shutdown() +{ + m_CidStore.Flush(); +} + +void +ActionRecorder::RegisterWorker(const CbPackage& WorkerPackage) +{ + const IoHash WorkerId = WorkerPackage.GetObjectHash(); + + m_WorkersFile.AppendObject(WorkerPackage.GetObject(), WorkerId); + + std::unordered_set<IoHash> AddedChunks; + uint64_t AddedBytes = 0; + + // First add all attachments from the worker package itself + + for (const CbAttachment& Attachment : WorkerPackage.GetAttachments()) + { + CompressedBuffer Buffer = Attachment.AsCompressedBinary(); + IoBuffer Data = Buffer.GetCompressed().Flatten().AsIoBuffer(); + + const IoHash ChunkHash = Buffer.DecodeRawHash(); + + CidStore::InsertResult Result = m_CidStore.AddChunk(Data, ChunkHash, CidStore::InsertMode::kCopyOnly); + + AddedChunks.insert(ChunkHash); + + if (Result.New) + { + AddedBytes += Data.GetSize(); + } + } + + // Not all attachments will be present in the worker package, so we need to add + // all referenced chunks to ensure that the recording is self-contained and not + // referencing data in the main CID store + + CbObject WorkerDescriptor = WorkerPackage.GetObject(); + + WorkerDescriptor.IterateAttachments([&](const CbFieldView AttachmentField) { + const IoHash AttachmentCid = AttachmentField.GetValue().AsHash(); + + if (!AddedChunks.contains(AttachmentCid)) + { + IoBuffer AttachmentData = m_ChunkResolver.FindChunkByCid(AttachmentCid); + + if (AttachmentData) + { + CidStore::InsertResult Result = m_CidStore.AddChunk(AttachmentData, AttachmentCid, CidStore::InsertMode::kCopyOnly); + + if (Result.New) + { + AddedBytes += AttachmentData.GetSize(); + } + } + else + { + ZEN_WARN("RegisterWorker: could not resolve attachment chunk {} for worker {}", AttachmentCid, WorkerId); + } + + AddedChunks.insert(AttachmentCid); + } + }); + + ZEN_INFO("recorded worker {} with {} attachments ({} bytes)", WorkerId, AddedChunks.size(), AddedBytes); +} + +bool +ActionRecorder::RecordAction(Ref<RunnerAction> Action) +{ + bool AllGood = true; + + Action->ActionObj.IterateAttachments([&](CbFieldView Field) { + IoHash AttachData = Field.AsHash(); + IoBuffer ChunkData = m_ChunkResolver.FindChunkByCid(AttachData); + + if (ChunkData) + { + if (ChunkData.GetContentType() == ZenContentType::kCompressedBinary) + { + IoHash DecompressedHash; + uint64_t RawSize = 0; + CompressedBuffer Compressed = + CompressedBuffer::FromCompressed(SharedBuffer(ChunkData), /* out */ DecompressedHash, /* out*/ RawSize); + + OodleCompressor Compressor; + OodleCompressionLevel CompressionLevel; + uint64_t BlockSize = 0; + if (Compressed.TryGetCompressParameters(/* out */ Compressor, /* out */ CompressionLevel, /* out */ BlockSize)) + { + if (Compressor == OodleCompressor::NotSet) + { + CompositeBuffer Decompressed = Compressed.DecompressToComposite(); + CompressedBuffer NewCompressed = CompressedBuffer::Compress(std::move(Decompressed), + OodleCompressor::Mermaid, + OodleCompressionLevel::Fast, + BlockSize); + + ChunkData = NewCompressed.GetCompressed().Flatten().AsIoBuffer(); + } + } + } + + const uint64_t ChunkSize = ChunkData.GetSize(); + + m_CidStore.AddChunk(ChunkData, AttachData, CidStore::InsertMode::kCopyOnly); + ++m_ChunkCounter; + m_ChunkBytesCounter.fetch_add(ChunkSize); + } + else + { + AllGood = false; + + ZEN_WARN("could not resolve chunk {}", AttachData); + } + }); + + if (AllGood) + { + m_ActionsFile.AppendObject(Action->ActionObj, Action->ActionId); + ++m_ActionsCounter; + + return true; + } + else + { + return false; + } +} + +} // namespace zen::compute + +#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zencompute/actionrecorder.h b/src/zencompute/actionrecorder.h new file mode 100644 index 000000000..9cc2b44a2 --- /dev/null +++ b/src/zencompute/actionrecorder.h @@ -0,0 +1,91 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencompute/functionservice.h> +#include <zencompute/zencompute.h> +#include <zencore/basicfile.h> +#include <zencore/compactbinarybuilder.h> +#include <zenstore/cidstore.h> +#include <zenstore/gc.h> +#include <zenstore/zenstore.h> + +#include <filesystem> +#include <functional> +#include <map> +#include <unordered_map> + +namespace zen { +class CbObject; +class CbPackage; +struct IoHash; +} // namespace zen + +#if ZEN_WITH_COMPUTE_SERVICES + +namespace zen::compute { + +////////////////////////////////////////////////////////////////////////// + +struct RecordingFileWriter +{ + RecordingFileWriter(RecordingFileWriter&&) = delete; + RecordingFileWriter& operator=(RecordingFileWriter&&) = delete; + + RwLock m_FileLock; + BasicFile m_File; + uint64_t m_FileOffset = 0; + CbObjectWriter m_TocWriter; + BasicFile m_TocFile; + + RecordingFileWriter(); + ~RecordingFileWriter(); + + void Open(std::filesystem::path FilePath); + void Close(); + void AppendObject(const CbObject& Object, const IoHash& ObjectHash); +}; + +////////////////////////////////////////////////////////////////////////// + +/** + * Recording "runner" implementation + * + * This class writes out all actions and their attachments to a recording directory + * in a format that can be read back by the RecordingReader. + * + * The contents of the recording directory will be self-contained, with all referenced + * attachments stored in the recording directory itself, so that the recording can be + * moved or shared without needing to maintain references to the main CID store. + * + */ + +class ActionRecorder +{ +public: + ActionRecorder(ChunkResolver& InChunkResolver, const std::filesystem::path& RecordingLogPath); + ~ActionRecorder(); + + ActionRecorder(const ActionRecorder&) = delete; + ActionRecorder& operator=(const ActionRecorder&) = delete; + + void Shutdown(); + void RegisterWorker(const CbPackage& WorkerPackage); + bool RecordAction(Ref<RunnerAction> Action); + +private: + ChunkResolver& m_ChunkResolver; + std::filesystem::path m_RecordingLogDir; + + RecordingFileWriter m_WorkersFile; + RecordingFileWriter m_ActionsFile; + GcManager m_Gc; + CidStore m_CidStore{m_Gc}; + std::atomic<int> m_ChunkCounter{0}; + std::atomic<uint64_t> m_ChunkBytesCounter{0}; + std::atomic<int> m_ActionsCounter{0}; +}; + +} // namespace zen::compute + +#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zencompute/functionrunner.cpp b/src/zencompute/functionrunner.cpp new file mode 100644 index 000000000..8e7c12b2b --- /dev/null +++ b/src/zencompute/functionrunner.cpp @@ -0,0 +1,112 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "functionrunner.h" + +#if ZEN_WITH_COMPUTE_SERVICES + +# include <zencore/compactbinary.h> +# include <zencore/filesystem.h> + +# include <fmt/format.h> +# include <vector> + +namespace zen::compute { + +FunctionRunner::FunctionRunner(std::filesystem::path BasePath) : m_ActionsPath(BasePath / "actions") +{ +} + +FunctionRunner::~FunctionRunner() = default; + +size_t +FunctionRunner::QueryCapacity() +{ + return 1; +} + +std::vector<SubmitResult> +FunctionRunner::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) +{ + std::vector<SubmitResult> Results; + Results.reserve(Actions.size()); + + for (const Ref<RunnerAction>& Action : Actions) + { + Results.push_back(SubmitAction(Action)); + } + + return Results; +} + +void +FunctionRunner::MaybeDumpAction(int ActionLsn, const CbObject& ActionObject) +{ + if (m_DumpActions) + { + std::string UniqueId = fmt::format("{}.ddb", ActionLsn); + std::filesystem::path Path = m_ActionsPath / UniqueId; + + zen::WriteFile(Path, IoBuffer(ActionObject.GetBuffer().AsIoBuffer())); + } +} + +////////////////////////////////////////////////////////////////////////// + +RunnerAction::RunnerAction(FunctionServiceSession* OwnerSession) : m_OwnerSession(OwnerSession) +{ + this->Timestamps[static_cast<int>(State::New)] = DateTime::Now().GetTicks(); +} + +RunnerAction::~RunnerAction() +{ +} + +void +RunnerAction::SetActionState(State NewState) +{ + ZEN_ASSERT(NewState < State::_Count); + this->Timestamps[static_cast<int>(NewState)] = DateTime::Now().GetTicks(); + + do + { + if (State CurrentState = m_ActionState.load(); CurrentState == NewState) + { + // No state change + return; + } + else + { + if (NewState <= CurrentState) + { + // Cannot transition to an earlier or same state + return; + } + + if (m_ActionState.compare_exchange_strong(CurrentState, NewState)) + { + // Successful state change + + m_OwnerSession->PostUpdate(this); + + return; + } + } + } while (true); +} + +void +RunnerAction::SetResult(CbPackage&& Result) +{ + m_Result = std::move(Result); +} + +CbPackage& +RunnerAction::GetResult() +{ + ZEN_ASSERT(IsCompleted()); + return m_Result; +} + +} // namespace zen::compute + +#endif // ZEN_WITH_COMPUTE_SERVICES
\ No newline at end of file diff --git a/src/zencompute/functionrunner.h b/src/zencompute/functionrunner.h new file mode 100644 index 000000000..6fd0d84cc --- /dev/null +++ b/src/zencompute/functionrunner.h @@ -0,0 +1,207 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencompute/functionservice.h> + +#if ZEN_WITH_COMPUTE_SERVICES + +# include <filesystem> +# include <vector> + +namespace zen::compute { + +struct SubmitResult +{ + bool IsAccepted = false; + std::string Reason; +}; + +/** Base interface for classes implementing a remote execution "runner" + */ +class FunctionRunner : public RefCounted +{ + FunctionRunner(FunctionRunner&&) = delete; + FunctionRunner& operator=(FunctionRunner&&) = delete; + +public: + FunctionRunner(std::filesystem::path BasePath); + virtual ~FunctionRunner() = 0; + + virtual void Shutdown() = 0; + virtual void RegisterWorker(const CbPackage& WorkerPackage) = 0; + + [[nodiscard]] virtual SubmitResult SubmitAction(Ref<RunnerAction> Action) = 0; + [[nodiscard]] virtual size_t GetSubmittedActionCount() = 0; + [[nodiscard]] virtual bool IsHealthy() = 0; + [[nodiscard]] virtual size_t QueryCapacity(); + [[nodiscard]] virtual std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions); + +protected: + std::filesystem::path m_ActionsPath; + bool m_DumpActions = false; + void MaybeDumpAction(int ActionLsn, const CbObject& ActionObject); +}; + +template<typename RunnerType> +struct RunnerGroup +{ + void AddRunner(RunnerType* Runner) + { + m_RunnersLock.WithExclusiveLock([&] { m_Runners.emplace_back(Runner); }); + } + size_t QueryCapacity() + { + size_t TotalCapacity = 0; + m_RunnersLock.WithSharedLock([&] { + for (const auto& Runner : m_Runners) + { + TotalCapacity += Runner->QueryCapacity(); + } + }); + return TotalCapacity; + } + + SubmitResult SubmitAction(Ref<RunnerAction> Action) + { + RwLock::SharedLockScope _(m_RunnersLock); + + const int InitialIndex = m_NextSubmitIndex.load(std::memory_order_acquire); + int Index = InitialIndex; + const int RunnerCount = gsl::narrow<int>(m_Runners.size()); + + if (RunnerCount == 0) + { + return {.IsAccepted = false, .Reason = "No runners available"}; + } + + do + { + while (Index >= RunnerCount) + { + Index -= RunnerCount; + } + + auto& Runner = m_Runners[Index++]; + + SubmitResult Result = Runner->SubmitAction(Action); + + if (Result.IsAccepted == true) + { + m_NextSubmitIndex = Index % RunnerCount; + + return Result; + } + + while (Index >= RunnerCount) + { + Index -= RunnerCount; + } + } while (Index != InitialIndex); + + return {.IsAccepted = false}; + } + + size_t GetSubmittedActionCount() + { + RwLock::SharedLockScope _(m_RunnersLock); + + size_t TotalCount = 0; + + for (const auto& Runner : m_Runners) + { + TotalCount += Runner->GetSubmittedActionCount(); + } + + return TotalCount; + } + + void RegisterWorker(CbPackage Worker) + { + RwLock::SharedLockScope _(m_RunnersLock); + + for (auto& Runner : m_Runners) + { + Runner->RegisterWorker(Worker); + } + } + + void Shutdown() + { + RwLock::SharedLockScope _(m_RunnersLock); + + for (auto& Runner : m_Runners) + { + Runner->Shutdown(); + } + } + +private: + RwLock m_RunnersLock; + std::vector<Ref<RunnerType>> m_Runners; + std::atomic<int> m_NextSubmitIndex{0}; +}; + +/** + * This represents an action going through different stages of scheduling and execution. + */ +struct RunnerAction : public RefCounted +{ + explicit RunnerAction(FunctionServiceSession* OwnerSession); + ~RunnerAction(); + + int ActionLsn = 0; + WorkerDesc Worker; + IoHash ActionId; + CbObject ActionObj; + int Priority = 0; + + enum class State + { + New, + Pending, + Running, + Completed, + Failed, + _Count + }; + + static const char* ToString(State _) + { + switch (_) + { + case State::New: + return "New"; + case State::Pending: + return "Pending"; + case State::Running: + return "Running"; + case State::Completed: + return "Completed"; + case State::Failed: + return "Failed"; + default: + return "Unknown"; + } + } + + uint64_t Timestamps[static_cast<int>(State::_Count)] = {}; + + State ActionState() const { return m_ActionState; } + void SetActionState(State NewState); + + bool IsSuccess() const { return ActionState() == State::Completed; } + bool IsCompleted() const { return ActionState() == State::Completed || ActionState() == State::Failed; } + + void SetResult(CbPackage&& Result); + CbPackage& GetResult(); + +private: + std::atomic<State> m_ActionState = State::New; + FunctionServiceSession* m_OwnerSession = nullptr; + CbPackage m_Result; +}; + +} // namespace zen::compute + +#endif // ZEN_WITH_COMPUTE_SERVICES
\ No newline at end of file diff --git a/src/zencompute/functionservice.cpp b/src/zencompute/functionservice.cpp new file mode 100644 index 000000000..0698449e9 --- /dev/null +++ b/src/zencompute/functionservice.cpp @@ -0,0 +1,957 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "zencompute/functionservice.h" + +#if ZEN_WITH_COMPUTE_SERVICES + +# include "functionrunner.h" +# include "actionrecorder.h" +# include "localrunner.h" +# include "remotehttprunner.h" + +# include <zencompute/recordingreader.h> +# include <zencore/compactbinary.h> +# include <zencore/compactbinarybuilder.h> +# include <zencore/compactbinarypackage.h> +# include <zencore/compress.h> +# include <zencore/except.h> +# include <zencore/filesystem.h> +# include <zencore/fmtutils.h> +# include <zencore/iobuffer.h> +# include <zencore/iohash.h> +# include <zencore/logging.h> +# include <zencore/scopeguard.h> +# include <zentelemetry/stats.h> + +# include <set> +# include <deque> +# include <map> +# include <thread> +# include <unordered_map> + +ZEN_THIRD_PARTY_INCLUDES_START +# include <EASTL/hash_set.h> +ZEN_THIRD_PARTY_INCLUDES_END + +using namespace std::literals; + +namespace zen::compute { + +////////////////////////////////////////////////////////////////////////// + +struct FunctionServiceSession::Impl +{ + FunctionServiceSession* m_FunctionServiceSession; + ChunkResolver& m_ChunkResolver; + LoggerRef m_Log{logging::Get("apply")}; + + Impl(FunctionServiceSession* InFunctionServiceSession, ChunkResolver& InChunkResolver) + : m_FunctionServiceSession(InFunctionServiceSession) + , m_ChunkResolver(InChunkResolver) + { + m_SchedulingThread = std::thread{&Impl::MonitorThreadFunction, this}; + } + + void Shutdown(); + bool IsHealthy(); + + LoggerRef Log() { return m_Log; } + + std::atomic_bool m_AcceptActions = true; + + struct FunctionDefinition + { + std::string FunctionName; + Guid FunctionVersion; + Guid BuildSystemVersion; + IoHash WorkerId; + }; + + void EmitStats(CbObjectWriter& Cbo) + { + m_WorkerLock.WithSharedLock([&] { Cbo << "worker_count"sv << m_WorkerMap.size(); }); + m_ResultsLock.WithSharedLock([&] { Cbo << "actions_complete"sv << m_ResultsMap.size(); }); + m_PendingLock.WithSharedLock([&] { Cbo << "actions_pending"sv << m_PendingActions.size(); }); + Cbo << "actions_submitted"sv << GetSubmittedActionCount(); + EmitSnapshot("actions_retired"sv, m_ResultRate, Cbo); + } + + void RegisterWorker(CbPackage Worker); + WorkerDesc GetWorkerDescriptor(const IoHash& WorkerId); + + std::atomic<int32_t> m_ActionsCounter = 0; // sequence number + + RwLock m_PendingLock; + std::map<int, Ref<RunnerAction>> m_PendingActions; + + RwLock m_RunningLock; + std::unordered_map<int, Ref<RunnerAction>> m_RunningMap; + + RwLock m_ResultsLock; + std::unordered_map<int, Ref<RunnerAction>> m_ResultsMap; + metrics::Meter m_ResultRate; + std::atomic<uint64_t> m_RetiredCount{0}; + + HttpResponseCode GetActionResult(int ActionLsn, CbPackage& OutResultPackage); + HttpResponseCode FindActionResult(const IoHash& ActionId, CbPackage& ResultPackage); + + std::atomic<bool> m_ShutdownRequested{false}; + + std::thread m_SchedulingThread; + std::atomic<bool> m_SchedulingThreadEnabled{true}; + Event m_SchedulingThreadEvent; + + void MonitorThreadFunction(); + void SchedulePendingActions(); + + // Workers + + RwLock m_WorkerLock; + std::unordered_map<IoHash, CbPackage> m_WorkerMap; + std::vector<FunctionDefinition> m_FunctionList; + std::vector<IoHash> GetKnownWorkerIds(); + + // Runners + + RunnerGroup<LocalProcessRunner> m_LocalRunnerGroup; + RunnerGroup<RemoteHttpRunner> m_RemoteRunnerGroup; + + EnqueueResult EnqueueAction(CbObject ActionObject, int Priority); + EnqueueResult EnqueueResolvedAction(WorkerDesc Worker, CbObject ActionObj, int RequestPriority); + + void GetCompleted(CbWriter& Cbo); + + // Recording + + void StartRecording(ChunkResolver& InCidStore, const std::filesystem::path& RecordingPath); + void StopRecording(); + + std::unique_ptr<ActionRecorder> m_Recorder; + + // History tracking + + RwLock m_ActionHistoryLock; + std::deque<FunctionServiceSession::ActionHistoryEntry> m_ActionHistory; + size_t m_HistoryLimit = 1000; + + std::vector<FunctionServiceSession::ActionHistoryEntry> GetActionHistory(int Limit); + + // + + [[nodiscard]] size_t QueryCapacity(); + + [[nodiscard]] SubmitResult SubmitAction(Ref<RunnerAction> Action); + [[nodiscard]] std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions); + [[nodiscard]] size_t GetSubmittedActionCount(); + + // Updates + + RwLock m_UpdatedActionsLock; + std::vector<Ref<RunnerAction>> m_UpdatedActions; + + void HandleActionUpdates(); + void PostUpdate(RunnerAction* Action); + + void ShutdownRunners(); +}; + +bool +FunctionServiceSession::Impl::IsHealthy() +{ + return true; +} + +void +FunctionServiceSession::Impl::Shutdown() +{ + m_AcceptActions = false; + m_ShutdownRequested = true; + + m_SchedulingThreadEnabled = false; + m_SchedulingThreadEvent.Set(); + if (m_SchedulingThread.joinable()) + { + m_SchedulingThread.join(); + } + + ShutdownRunners(); +} + +void +FunctionServiceSession::Impl::ShutdownRunners() +{ + m_LocalRunnerGroup.Shutdown(); + m_RemoteRunnerGroup.Shutdown(); +} + +void +FunctionServiceSession::Impl::StartRecording(ChunkResolver& InCidStore, const std::filesystem::path& RecordingPath) +{ + ZEN_INFO("starting recording to '{}'", RecordingPath); + + m_Recorder = std::make_unique<ActionRecorder>(InCidStore, RecordingPath); + + ZEN_INFO("started recording to '{}'", RecordingPath); +} + +void +FunctionServiceSession::Impl::StopRecording() +{ + ZEN_INFO("stopping recording"); + + m_Recorder = nullptr; + + ZEN_INFO("stopped recording"); +} + +std::vector<FunctionServiceSession::ActionHistoryEntry> +FunctionServiceSession::Impl::GetActionHistory(int Limit) +{ + RwLock::SharedLockScope _(m_ActionHistoryLock); + + if (Limit > 0 && static_cast<size_t>(Limit) < m_ActionHistory.size()) + { + return std::vector<ActionHistoryEntry>(m_ActionHistory.end() - Limit, m_ActionHistory.end()); + } + + return std::vector<ActionHistoryEntry>(m_ActionHistory.begin(), m_ActionHistory.end()); +} + +void +FunctionServiceSession::Impl::RegisterWorker(CbPackage Worker) +{ + RwLock::ExclusiveLockScope _(m_WorkerLock); + + const IoHash& WorkerId = Worker.GetObject().GetHash(); + + if (m_WorkerMap.insert_or_assign(WorkerId, Worker).second) + { + // Note that since the convention currently is that WorkerId is equal to the hash + // of the worker descriptor there is no chance that we get a second write with a + // different descriptor. Thus we only need to call this the first time, when the + // worker is added + + m_LocalRunnerGroup.RegisterWorker(Worker); + m_RemoteRunnerGroup.RegisterWorker(Worker); + + if (m_Recorder) + { + m_Recorder->RegisterWorker(Worker); + } + + CbObject WorkerObj = Worker.GetObject(); + + // Populate worker database + + const Guid WorkerBuildSystemVersion = WorkerObj["buildsystem_version"sv].AsUuid(); + + for (auto& Item : WorkerObj["functions"sv]) + { + CbObjectView Function = Item.AsObjectView(); + + std::string_view FunctionName = Function["name"sv].AsString(); + const Guid FunctionVersion = Function["version"sv].AsUuid(); + + m_FunctionList.emplace_back(FunctionDefinition{.FunctionName = std::string{FunctionName}, + .FunctionVersion = FunctionVersion, + .BuildSystemVersion = WorkerBuildSystemVersion, + .WorkerId = WorkerId}); + } + } +} + +WorkerDesc +FunctionServiceSession::Impl::GetWorkerDescriptor(const IoHash& WorkerId) +{ + RwLock::SharedLockScope _(m_WorkerLock); + + if (auto It = m_WorkerMap.find(WorkerId); It != m_WorkerMap.end()) + { + const CbPackage& Desc = It->second; + return {Desc, WorkerId}; + } + + return {}; +} + +std::vector<IoHash> +FunctionServiceSession::Impl::GetKnownWorkerIds() +{ + std::vector<IoHash> WorkerIds; + WorkerIds.reserve(m_WorkerMap.size()); + + m_WorkerLock.WithSharedLock([&] { + for (const auto& [WorkerId, _] : m_WorkerMap) + { + WorkerIds.push_back(WorkerId); + } + }); + + return WorkerIds; +} + +FunctionServiceSession::EnqueueResult +FunctionServiceSession::Impl::EnqueueAction(CbObject ActionObject, int Priority) +{ + // Resolve function to worker + + IoHash WorkerId{IoHash::Zero}; + + std::string_view FunctionName = ActionObject["Function"sv].AsString(); + const Guid FunctionVersion = ActionObject["FunctionVersion"sv].AsUuid(); + const Guid BuildSystemVersion = ActionObject["BuildSystemVersion"sv].AsUuid(); + + for (const FunctionDefinition& FuncDef : m_FunctionList) + { + if (FuncDef.FunctionName == FunctionName && FuncDef.FunctionVersion == FunctionVersion && + FuncDef.BuildSystemVersion == BuildSystemVersion) + { + WorkerId = FuncDef.WorkerId; + + break; + } + } + + if (WorkerId == IoHash::Zero) + { + CbObjectWriter Writer; + + Writer << "Function"sv << FunctionName << "FunctionVersion"sv << FunctionVersion << "BuildSystemVersion" << BuildSystemVersion; + Writer << "error" + << "no worker matches the action specification"; + + return {0, Writer.Save()}; + } + + if (auto It = m_WorkerMap.find(WorkerId); It != m_WorkerMap.end()) + { + CbPackage WorkerPackage = It->second; + + return EnqueueResolvedAction(WorkerDesc{WorkerPackage, WorkerId}, ActionObject, Priority); + } + + CbObjectWriter Writer; + + Writer << "Function"sv << FunctionName << "FunctionVersion"sv << FunctionVersion << "BuildSystemVersion" << BuildSystemVersion; + Writer << "error" + << "no worker found despite match"; + + return {0, Writer.Save()}; +} + +FunctionServiceSession::EnqueueResult +FunctionServiceSession::Impl::EnqueueResolvedAction(WorkerDesc Worker, CbObject ActionObj, int RequestPriority) +{ + const int ActionLsn = ++m_ActionsCounter; + + Ref<RunnerAction> Pending{new RunnerAction(m_FunctionServiceSession)}; + + Pending->ActionLsn = ActionLsn; + Pending->Worker = Worker; + Pending->ActionId = ActionObj.GetHash(); + Pending->ActionObj = ActionObj; + Pending->Priority = RequestPriority; + + SubmitResult SubResult = SubmitAction(Pending); + + if (SubResult.IsAccepted) + { + // Great, the job is being taken care of by the runner + ZEN_DEBUG("direct schedule LSN {}", Pending->ActionLsn); + } + else + { + ZEN_DEBUG("action {} ({}) PENDING", Pending->ActionId, Pending->ActionLsn); + + Pending->SetActionState(RunnerAction::State::Pending); + } + + if (m_Recorder) + { + m_Recorder->RecordAction(Pending); + } + + CbObjectWriter Writer; + Writer << "lsn" << Pending->ActionLsn; + Writer << "worker" << Pending->Worker.WorkerId; + Writer << "action" << Pending->ActionId; + + return {Pending->ActionLsn, Writer.Save()}; +} + +SubmitResult +FunctionServiceSession::Impl::SubmitAction(Ref<RunnerAction> Action) +{ + // Loosely round-robin scheduling of actions across runners. + // + // It's not entirely clear what this means given that submits + // can come in across multiple threads, but it's probably better + // than always starting with the first runner. + // + // Longer term we should track the state of the individual + // runners and make decisions accordingly. + + SubmitResult Result = m_LocalRunnerGroup.SubmitAction(Action); + if (Result.IsAccepted) + { + return Result; + } + + return m_RemoteRunnerGroup.SubmitAction(Action); +} + +size_t +FunctionServiceSession::Impl::GetSubmittedActionCount() +{ + return m_LocalRunnerGroup.GetSubmittedActionCount() + m_RemoteRunnerGroup.GetSubmittedActionCount(); +} + +HttpResponseCode +FunctionServiceSession::Impl::GetActionResult(int ActionLsn, CbPackage& OutResultPackage) +{ + // This lock is held for the duration of the function since we need to + // be sure that the action doesn't change state while we are checking the + // different data structures + + RwLock::ExclusiveLockScope _(m_ResultsLock); + + if (auto It = m_ResultsMap.find(ActionLsn); It != m_ResultsMap.end()) + { + OutResultPackage = std::move(It->second->GetResult()); + + m_ResultsMap.erase(It); + + return HttpResponseCode::OK; + } + + { + RwLock::SharedLockScope __(m_PendingLock); + + if (auto FindIt = m_PendingActions.find(ActionLsn); FindIt != m_PendingActions.end()) + { + return HttpResponseCode::Accepted; + } + } + + // Lock order is important here to avoid deadlocks, RwLock m_RunningLock must + // always be taken after m_ResultsLock if both are needed + + { + RwLock::SharedLockScope __(m_RunningLock); + + if (m_RunningMap.find(ActionLsn) != m_RunningMap.end()) + { + return HttpResponseCode::Accepted; + } + } + + return HttpResponseCode::NotFound; +} + +HttpResponseCode +FunctionServiceSession::Impl::FindActionResult(const IoHash& ActionId, CbPackage& OutResultPackage) +{ + // This lock is held for the duration of the function since we need to + // be sure that the action doesn't change state while we are checking the + // different data structures + + RwLock::ExclusiveLockScope _(m_ResultsLock); + + for (auto It = begin(m_ResultsMap), End = end(m_ResultsMap); It != End; ++It) + { + if (It->second->ActionId == ActionId) + { + OutResultPackage = std::move(It->second->GetResult()); + + m_ResultsMap.erase(It); + + return HttpResponseCode::OK; + } + } + + { + RwLock::SharedLockScope __(m_PendingLock); + + for (const auto& [K, Pending] : m_PendingActions) + { + if (Pending->ActionId == ActionId) + { + return HttpResponseCode::Accepted; + } + } + } + + // Lock order is important here to avoid deadlocks, RwLock m_RunningLock must + // always be taken after m_ResultsLock if both are needed + + { + RwLock::SharedLockScope __(m_RunningLock); + + for (const auto& [K, v] : m_RunningMap) + { + if (v->ActionId == ActionId) + { + return HttpResponseCode::Accepted; + } + } + } + + return HttpResponseCode::NotFound; +} + +void +FunctionServiceSession::Impl::GetCompleted(CbWriter& Cbo) +{ + Cbo.BeginArray("completed"); + + m_ResultsLock.WithSharedLock([&] { + for (auto& Kv : m_ResultsMap) + { + Cbo << Kv.first; + } + }); + + Cbo.EndArray(); +} + +# define ZEN_BATCH_SCHEDULER 1 + +void +FunctionServiceSession::Impl::SchedulePendingActions() +{ + int ScheduledCount = 0; + size_t RunningCount = m_RunningLock.WithSharedLock([&] { return m_RunningMap.size(); }); + size_t PendingCount = m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); }); + size_t ResultCount = m_ResultsLock.WithSharedLock([&] { return m_ResultsMap.size(); }); + + static Stopwatch DumpRunningTimer; + + auto _ = MakeGuard([&] { + ZEN_INFO("scheduled {} pending actions. {} running ({} retired), {} still pending, {} results", + ScheduledCount, + RunningCount, + m_RetiredCount.load(), + PendingCount, + ResultCount); + + if (DumpRunningTimer.GetElapsedTimeMs() > 30000) + { + DumpRunningTimer.Reset(); + + std::set<int> RunningList; + m_RunningLock.WithSharedLock([&] { + for (auto& [K, V] : m_RunningMap) + { + RunningList.insert(K); + } + }); + + ExtendableStringBuilder<1024> RunningString; + for (int i : RunningList) + { + if (RunningString.Size()) + { + RunningString << ", "; + } + + RunningString.Append(IntNum(i)); + } + + ZEN_INFO("running: {}", RunningString); + } + }); + +# if ZEN_BATCH_SCHEDULER + size_t Capacity = QueryCapacity(); + + if (!Capacity) + { + _.Dismiss(); + + return; + } + + std::vector<Ref<RunnerAction>> ActionsToSchedule; + + // Pull actions to schedule from the pending queue, we will try to submit these to the runner outside of the lock + + m_PendingLock.WithExclusiveLock([&] { + if (m_ShutdownRequested) + { + return; + } + + if (m_PendingActions.empty()) + { + return; + } + + size_t NumActionsToSchedule = std::min(Capacity, m_PendingActions.size()); + + auto PendingIt = m_PendingActions.begin(); + const auto PendingEnd = m_PendingActions.end(); + + while (NumActionsToSchedule && PendingIt != PendingEnd) + { + const Ref<RunnerAction>& Pending = PendingIt->second; + + switch (Pending->ActionState()) + { + case RunnerAction::State::Pending: + ActionsToSchedule.push_back(Pending); + break; + + case RunnerAction::State::Running: + case RunnerAction::State::Completed: + case RunnerAction::State::Failed: + break; + + default: + case RunnerAction::State::New: + ZEN_WARN("unexpected state {} for pending action {}", static_cast<int>(Pending->ActionState()), Pending->ActionLsn); + break; + } + + ++PendingIt; + --NumActionsToSchedule; + } + + PendingCount = m_PendingActions.size(); + }); + + if (ActionsToSchedule.empty()) + { + _.Dismiss(); + return; + } + + ZEN_INFO("attempting schedule of {} pending actions", ActionsToSchedule.size()); + + auto SubmitResults = SubmitActions(ActionsToSchedule); + + // Move successfully scheduled actions to the running map and remove + // from pending queue. It's actually possible that by the time we get + // to this stage some of the actions may have already completed, so + // they should not always be added to the running map + + eastl::hash_set<int> ScheduledActions; + + for (size_t i = 0; i < ActionsToSchedule.size(); ++i) + { + const Ref<RunnerAction>& Pending = ActionsToSchedule[i]; + const SubmitResult& SubResult = SubmitResults[i]; + + if (SubResult.IsAccepted) + { + ScheduledActions.insert(Pending->ActionLsn); + } + } + + ScheduledCount += (int)ActionsToSchedule.size(); + +# else + m_PendingLock.WithExclusiveLock([&] { + while (!m_PendingActions.empty()) + { + if (m_ShutdownRequested) + { + return; + } + + // Here it would be good if we could decide to pop immediately to avoid + // holding the lock while creating processes etc + const Ref<RunnerAction>& Pending = m_PendingActions.begin()->second; + FunctionRunner::SubmitResult SubResult = SubmitAction(Pending); + + if (SubResult.IsAccepted) + { + // Great, the job is being taken care of by the runner + + ZEN_DEBUG("action {} ({}) PENDING -> RUNNING", Pending->ActionId, Pending->ActionLsn); + + m_RunningLock.WithExclusiveLock([&] { + m_RunningMap.insert({Pending->ActionLsn, Pending}); + + RunningCount = m_RunningMap.size(); + }); + + m_PendingActions.pop_front(); + + PendingCount = m_PendingActions.size(); + ++ScheduledCount; + } + else + { + // Runner could not accept the job, leave it on the pending queue + + return; + } + } + }); +# endif +} + +void +FunctionServiceSession::Impl::MonitorThreadFunction() +{ + SetCurrentThreadName("FunctionServiceSession_Monitor"); + + auto _ = MakeGuard([&] { ZEN_INFO("monitor thread exiting"); }); + + do + { + int TimeoutMs = 1000; + + if (m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); })) + { + TimeoutMs = 100; + } + + const bool Timedout = m_SchedulingThreadEvent.Wait(TimeoutMs); + + if (m_SchedulingThreadEnabled == false) + { + return; + } + + HandleActionUpdates(); + + // Schedule pending actions + + SchedulePendingActions(); + + if (!Timedout) + { + m_SchedulingThreadEvent.Reset(); + } + } while (m_SchedulingThreadEnabled); +} + +void +FunctionServiceSession::Impl::PostUpdate(RunnerAction* Action) +{ + m_UpdatedActionsLock.WithExclusiveLock([&] { m_UpdatedActions.emplace_back(Action); }); +} + +void +FunctionServiceSession::Impl::HandleActionUpdates() +{ + std::vector<Ref<RunnerAction>> UpdatedActions; + + m_UpdatedActionsLock.WithExclusiveLock([&] { std::swap(UpdatedActions, m_UpdatedActions); }); + + std::unordered_set<int> SeenLsn; + std::unordered_set<int> RunningLsn; + + for (Ref<RunnerAction>& Action : UpdatedActions) + { + const int ActionLsn = Action->ActionLsn; + + if (auto [It, Inserted] = SeenLsn.insert(ActionLsn); Inserted) + { + switch (Action->ActionState()) + { + case RunnerAction::State::Pending: + m_PendingLock.WithExclusiveLock([&] { m_PendingActions.insert({ActionLsn, Action}); }); + break; + + case RunnerAction::State::Running: + m_PendingLock.WithExclusiveLock([&] { + m_RunningLock.WithExclusiveLock([&] { + m_RunningMap.insert({ActionLsn, Action}); + m_PendingActions.erase(ActionLsn); + }); + }); + ZEN_DEBUG("action {} ({}) RUNNING", Action->ActionId, ActionLsn); + break; + + case RunnerAction::State::Completed: + case RunnerAction::State::Failed: + m_ResultsLock.WithExclusiveLock([&] { + m_ResultsMap[ActionLsn] = Action; + + m_PendingLock.WithExclusiveLock([&] { + m_RunningLock.WithExclusiveLock([&] { + if (auto FindIt = m_RunningMap.find(ActionLsn); FindIt == m_RunningMap.end()) + { + m_PendingActions.erase(ActionLsn); + } + else + { + m_RunningMap.erase(FindIt); + } + }); + }); + + m_ActionHistoryLock.WithExclusiveLock([&] { + ActionHistoryEntry Entry{.Lsn = ActionLsn, + .ActionId = Action->ActionId, + .WorkerId = Action->Worker.WorkerId, + .ActionDescriptor = Action->ActionObj, + .Succeeded = Action->ActionState() == RunnerAction::State::Completed}; + + std::copy(std::begin(Action->Timestamps), std::end(Action->Timestamps), std::begin(Entry.Timestamps)); + + m_ActionHistory.push_back(std::move(Entry)); + + if (m_ActionHistory.size() > m_HistoryLimit) + { + m_ActionHistory.pop_front(); + } + }); + }); + m_RetiredCount.fetch_add(1); + m_ResultRate.Mark(1); + ZEN_DEBUG("action {} ({}) RUNNING -> COMPLETED with {}", + Action->ActionId, + ActionLsn, + Action->ActionState() == RunnerAction::State::Completed ? "SUCCESS" : "FAILURE"); + break; + } + } + } +} + +size_t +FunctionServiceSession::Impl::QueryCapacity() +{ + return m_LocalRunnerGroup.QueryCapacity() + m_RemoteRunnerGroup.QueryCapacity(); +} + +std::vector<SubmitResult> +FunctionServiceSession::Impl::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) +{ + std::vector<SubmitResult> Results; + + for (const Ref<RunnerAction>& Action : Actions) + { + Results.push_back(SubmitAction(Action)); + } + + return Results; +} + +////////////////////////////////////////////////////////////////////////// + +FunctionServiceSession::FunctionServiceSession(ChunkResolver& InChunkResolver) +{ + m_Impl = std::make_unique<Impl>(this, InChunkResolver); +} + +FunctionServiceSession::~FunctionServiceSession() +{ + Shutdown(); +} + +bool +FunctionServiceSession::IsHealthy() +{ + return m_Impl->IsHealthy(); +} + +void +FunctionServiceSession::Shutdown() +{ + m_Impl->Shutdown(); +} + +void +FunctionServiceSession::StartRecording(ChunkResolver& InResolver, const std::filesystem::path& RecordingPath) +{ + m_Impl->StartRecording(InResolver, RecordingPath); +} + +void +FunctionServiceSession::StopRecording() +{ + m_Impl->StopRecording(); +} + +void +FunctionServiceSession::EmitStats(CbObjectWriter& Cbo) +{ + m_Impl->EmitStats(Cbo); +} + +std::vector<IoHash> +FunctionServiceSession::GetKnownWorkerIds() +{ + return m_Impl->GetKnownWorkerIds(); +} + +WorkerDesc +FunctionServiceSession::GetWorkerDescriptor(const IoHash& WorkerId) +{ + return m_Impl->GetWorkerDescriptor(WorkerId); +} + +void +FunctionServiceSession::AddLocalRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath) +{ + m_Impl->m_LocalRunnerGroup.AddRunner(new LocalProcessRunner(InChunkResolver, BasePath)); +} + +void +FunctionServiceSession::AddRemoteRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath, std::string_view HostName) +{ + m_Impl->m_RemoteRunnerGroup.AddRunner(new RemoteHttpRunner(InChunkResolver, BasePath, HostName)); +} + +FunctionServiceSession::EnqueueResult +FunctionServiceSession::EnqueueAction(CbObject ActionObject, int Priority) +{ + return m_Impl->EnqueueAction(ActionObject, Priority); +} + +FunctionServiceSession::EnqueueResult +FunctionServiceSession::EnqueueResolvedAction(WorkerDesc Worker, CbObject ActionObj, int RequestPriority) +{ + return m_Impl->EnqueueResolvedAction(Worker, ActionObj, RequestPriority); +} + +void +FunctionServiceSession::RegisterWorker(CbPackage Worker) +{ + m_Impl->RegisterWorker(Worker); +} + +HttpResponseCode +FunctionServiceSession::GetActionResult(int ActionLsn, CbPackage& OutResultPackage) +{ + return m_Impl->GetActionResult(ActionLsn, OutResultPackage); +} + +HttpResponseCode +FunctionServiceSession::FindActionResult(const IoHash& ActionId, CbPackage& OutResultPackage) +{ + return m_Impl->FindActionResult(ActionId, OutResultPackage); +} + +std::vector<FunctionServiceSession::ActionHistoryEntry> +FunctionServiceSession::GetActionHistory(int Limit) +{ + return m_Impl->GetActionHistory(Limit); +} + +void +FunctionServiceSession::GetCompleted(CbWriter& Cbo) +{ + m_Impl->GetCompleted(Cbo); +} + +void +FunctionServiceSession::PostUpdate(RunnerAction* Action) +{ + m_Impl->PostUpdate(Action); +} + +////////////////////////////////////////////////////////////////////////// + +void +function_forcelink() +{ +} + +} // namespace zen::compute + +#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zencompute/httpfunctionservice.cpp b/src/zencompute/httpfunctionservice.cpp new file mode 100644 index 000000000..09a9684a7 --- /dev/null +++ b/src/zencompute/httpfunctionservice.cpp @@ -0,0 +1,709 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "zencompute/httpfunctionservice.h" + +#if ZEN_WITH_COMPUTE_SERVICES + +# include "functionrunner.h" + +# include <zencore/compactbinary.h> +# include <zencore/compactbinarybuilder.h> +# include <zencore/compactbinarypackage.h> +# include <zencore/compress.h> +# include <zencore/except.h> +# include <zencore/filesystem.h> +# include <zencore/fmtutils.h> +# include <zencore/iobuffer.h> +# include <zencore/iohash.h> +# include <zencore/system.h> +# include <zenstore/cidstore.h> + +# include <span> + +using namespace std::literals; + +namespace zen::compute { + +constinit AsciiSet g_DecimalSet("0123456789"); +auto DecimalMatcher = [](std::string_view Str) { return AsciiSet::HasOnly(Str, g_DecimalSet); }; + +constinit AsciiSet g_HexSet("0123456789abcdefABCDEF"); +auto IoHashMatcher = [](std::string_view Str) { return Str.size() == 40 && AsciiSet::HasOnly(Str, g_HexSet); }; + +HttpFunctionService::HttpFunctionService(CidStore& InCidStore, + IHttpStatsService& StatsService, + [[maybe_unused]] const std::filesystem::path& BaseDir) +: m_CidStore(InCidStore) +, m_StatsService(StatsService) +, m_Log(logging::Get("apply")) +, m_BaseDir(BaseDir) +, m_FunctionService(InCidStore) +{ + m_FunctionService.AddLocalRunner(InCidStore, m_BaseDir / "local"); + + m_StatsService.RegisterHandler("apply", *this); + + m_Router.AddMatcher("lsn", DecimalMatcher); + m_Router.AddMatcher("worker", IoHashMatcher); + m_Router.AddMatcher("action", IoHashMatcher); + + m_Router.RegisterRoute( + "ready", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + if (m_FunctionService.IsHealthy()) + { + return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, "ok"); + } + + return HttpReq.WriteResponse(HttpResponseCode::ServiceUnavailable); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "workers", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + CbObjectWriter Cbo; + Cbo.BeginArray("workers"sv); + for (const IoHash& WorkerId : m_FunctionService.GetKnownWorkerIds()) + { + Cbo << WorkerId; + } + Cbo.EndArray(); + + return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "workers/{worker}", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); + + switch (HttpReq.RequestVerb()) + { + case HttpVerb::kGet: + if (WorkerDesc Desc = m_FunctionService.GetWorkerDescriptor(WorkerId)) + { + return HttpReq.WriteResponse(HttpResponseCode::OK, Desc.Descriptor.GetObject()); + } + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + + case HttpVerb::kPost: + { + switch (HttpReq.RequestContentType()) + { + case HttpContentType::kCbObject: + { + CbObject WorkerSpec = HttpReq.ReadPayloadObject(); + + // Determine which pieces are missing and need to be transmitted + + HashKeySet ChunkSet; + + WorkerSpec.IterateAttachments([&](CbFieldView Field) { + const IoHash Hash = Field.AsHash(); + ChunkSet.AddHashToSet(Hash); + }); + + CbPackage WorkerPackage; + WorkerPackage.SetObject(WorkerSpec); + + m_CidStore.FilterChunks(ChunkSet); + + if (ChunkSet.IsEmpty()) + { + ZEN_DEBUG("worker {}: all attachments already available", WorkerId); + m_FunctionService.RegisterWorker(WorkerPackage); + return HttpReq.WriteResponse(HttpResponseCode::NoContent); + } + + CbObjectWriter ResponseWriter; + ResponseWriter.BeginArray("need"); + + ChunkSet.IterateHashes([&](const IoHash& Hash) { + ZEN_DEBUG("worker {}: need chunk {}", WorkerId, Hash); + ResponseWriter.AddHash(Hash); + }); + + ResponseWriter.EndArray(); + + ZEN_DEBUG("worker {}: need {} attachments", WorkerId, ChunkSet.GetSize()); + + return HttpReq.WriteResponse(HttpResponseCode::NotFound, ResponseWriter.Save()); + } + break; + + case HttpContentType::kCbPackage: + { + CbPackage WorkerSpecPackage = HttpReq.ReadPayloadPackage(); + CbObject WorkerSpec = WorkerSpecPackage.GetObject(); + + std::span<const CbAttachment> Attachments = WorkerSpecPackage.GetAttachments(); + + int AttachmentCount = 0; + int NewAttachmentCount = 0; + uint64_t TotalAttachmentBytes = 0; + uint64_t TotalNewBytes = 0; + + for (const CbAttachment& Attachment : Attachments) + { + ZEN_ASSERT(Attachment.IsCompressedBinary()); + + const IoHash DataHash = Attachment.GetHash(); + CompressedBuffer Buffer = Attachment.AsCompressedBinary(); + + ZEN_UNUSED(DataHash); + TotalAttachmentBytes += Buffer.GetCompressedSize(); + ++AttachmentCount; + + const CidStore::InsertResult InsertResult = + m_CidStore.AddChunk(Buffer.GetCompressed().Flatten().AsIoBuffer(), DataHash); + + if (InsertResult.New) + { + TotalNewBytes += Buffer.GetCompressedSize(); + ++NewAttachmentCount; + } + } + + ZEN_DEBUG("worker {}: {} in {} attachments, {} in {} new attachments", + WorkerId, + zen::NiceBytes(TotalAttachmentBytes), + AttachmentCount, + zen::NiceBytes(TotalNewBytes), + NewAttachmentCount); + + m_FunctionService.RegisterWorker(WorkerSpecPackage); + + return HttpReq.WriteResponse(HttpResponseCode::NoContent); + } + break; + + default: + break; + } + } + break; + + default: + break; + } + }, + HttpVerb::kGet | HttpVerb::kPost); + + m_Router.RegisterRoute( + "jobs/completed", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + CbObjectWriter Cbo; + m_FunctionService.GetCompleted(Cbo); + + SystemMetrics Sm = GetSystemMetricsForReporting(); + Cbo.BeginObject("metrics"); + Describe(Sm, Cbo); + Cbo.EndObject(); + + HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "jobs/history", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + const auto QueryParams = HttpReq.GetQueryParams(); + + int QueryLimit = 50; + + if (auto LimitParam = QueryParams.GetValue("limit"); LimitParam.empty() == false) + { + QueryLimit = ParseInt<int>(LimitParam).value_or(50); + } + + CbObjectWriter Cbo; + Cbo.BeginArray("history"); + for (const auto& Entry : m_FunctionService.GetActionHistory(QueryLimit)) + { + Cbo.BeginObject(); + Cbo << "lsn"sv << Entry.Lsn; + Cbo << "actionId"sv << Entry.ActionId; + Cbo << "workerId"sv << Entry.WorkerId; + Cbo << "succeeded"sv << Entry.Succeeded; + Cbo << "actionDescriptor"sv << Entry.ActionDescriptor; + + for (const auto& Timestamp : Entry.Timestamps) + { + Cbo.AddInteger( + fmt::format("time_{}"sv, RunnerAction::ToString(static_cast<RunnerAction::State>(&Timestamp - Entry.Timestamps))), + Timestamp); + } + Cbo.EndObject(); + } + Cbo.EndArray(); + + HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "jobs/{lsn}", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + const int ActionLsn = std::stoi(std::string{Req.GetCapture(1)}); + + switch (HttpReq.RequestVerb()) + { + case HttpVerb::kGet: + { + CbPackage Output; + HttpResponseCode ResponseCode = m_FunctionService.GetActionResult(ActionLsn, Output); + + if (ResponseCode == HttpResponseCode::OK) + { + return HttpReq.WriteResponse(HttpResponseCode::OK, Output); + } + + return HttpReq.WriteResponse(ResponseCode); + } + break; + + case HttpVerb::kPost: + { + // Add support for cancellation, priority changes + } + break; + + default: + break; + } + }, + HttpVerb::kGet | HttpVerb::kPost); + + m_Router.RegisterRoute( + "jobs/{worker}/{action}", // This route is inefficient, and is only here for backwards compatibility. The preferred path is the + // one which uses the scheduled action lsn for lookups + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + const IoHash ActionId = IoHash::FromHexString(Req.GetCapture(2)); + + CbPackage Output; + if (HttpResponseCode ResponseCode = m_FunctionService.FindActionResult(ActionId, /* out */ Output); + ResponseCode != HttpResponseCode::OK) + { + ZEN_TRACE("jobs/{}/{}: {}", Req.GetCapture(1), Req.GetCapture(2), ToString(ResponseCode)) + + if (ResponseCode == HttpResponseCode::NotFound) + { + return HttpReq.WriteResponse(ResponseCode); + } + + return HttpReq.WriteResponse(ResponseCode); + } + + ZEN_DEBUG("jobs/{}/{}: OK", Req.GetCapture(1), Req.GetCapture(2)) + + return HttpReq.WriteResponse(HttpResponseCode::OK, Output); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "jobs/{worker}", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); + + WorkerDesc Worker = m_FunctionService.GetWorkerDescriptor(WorkerId); + + if (!Worker) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + + const auto QueryParams = Req.ServerRequest().GetQueryParams(); + + int RequestPriority = -1; + + if (auto PriorityParam = QueryParams.GetValue("priority"); PriorityParam.empty() == false) + { + RequestPriority = ParseInt<int>(PriorityParam).value_or(-1); + } + + switch (HttpReq.RequestVerb()) + { + case HttpVerb::kGet: + // TODO: return status of all pending or executing jobs + break; + + case HttpVerb::kPost: + switch (HttpReq.RequestContentType()) + { + case HttpContentType::kCbObject: + { + // This operation takes the proposed job spec and identifies which + // chunks are not present on this server. This list is then returned in + // the "need" list in the response + + IoBuffer Payload = HttpReq.ReadPayload(); + CbObject ActionObj = LoadCompactBinaryObject(Payload); + + std::vector<IoHash> NeedList; + + ActionObj.IterateAttachments([&](CbFieldView Field) { + const IoHash FileHash = Field.AsHash(); + + if (!m_CidStore.ContainsChunk(FileHash)) + { + NeedList.push_back(FileHash); + } + }); + + if (NeedList.empty()) + { + // We already have everything, enqueue the action for execution + + if (FunctionServiceSession::EnqueueResult Result = + m_FunctionService.EnqueueResolvedAction(Worker, ActionObj, RequestPriority)) + { + ZEN_DEBUG("action {} accepted (lsn {})", ActionObj.GetHash(), Result.Lsn); + + HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage); + } + + return; + } + + CbObjectWriter Cbo; + Cbo.BeginArray("need"); + + for (const IoHash& Hash : NeedList) + { + Cbo << Hash; + } + + Cbo.EndArray(); + CbObject Response = Cbo.Save(); + + return HttpReq.WriteResponse(HttpResponseCode::NotFound, Response); + } + break; + + case HttpContentType::kCbPackage: + { + CbPackage Action = HttpReq.ReadPayloadPackage(); + CbObject ActionObj = Action.GetObject(); + + std::span<const CbAttachment> Attachments = Action.GetAttachments(); + + int AttachmentCount = 0; + int NewAttachmentCount = 0; + uint64_t TotalAttachmentBytes = 0; + uint64_t TotalNewBytes = 0; + + for (const CbAttachment& Attachment : Attachments) + { + ZEN_ASSERT(Attachment.IsCompressedBinary()); + + const IoHash DataHash = Attachment.GetHash(); + CompressedBuffer DataView = Attachment.AsCompressedBinary(); + + ZEN_UNUSED(DataHash); + + const uint64_t CompressedSize = DataView.GetCompressedSize(); + + TotalAttachmentBytes += CompressedSize; + ++AttachmentCount; + + const CidStore::InsertResult InsertResult = + m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash); + + if (InsertResult.New) + { + TotalNewBytes += CompressedSize; + ++NewAttachmentCount; + } + } + + if (FunctionServiceSession::EnqueueResult Result = + m_FunctionService.EnqueueResolvedAction(Worker, ActionObj, RequestPriority)) + { + ZEN_DEBUG("accepted action {} (lsn {}): {} in {} attachments. {} new ({} attachments)", + ActionObj.GetHash(), + Result.Lsn, + zen::NiceBytes(TotalAttachmentBytes), + AttachmentCount, + zen::NiceBytes(TotalNewBytes), + NewAttachmentCount); + + HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage); + } + + return; + } + break; + + default: + break; + } + break; + + default: + break; + } + }, + HttpVerb::kPost); + + m_Router.RegisterRoute( + "jobs", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + const auto QueryParams = HttpReq.GetQueryParams(); + + int RequestPriority = -1; + + if (auto PriorityParam = QueryParams.GetValue("priority"); PriorityParam.empty() == false) + { + RequestPriority = ParseInt<int>(PriorityParam).value_or(-1); + } + + // Resolve worker + + // + + switch (HttpReq.RequestContentType()) + { + case HttpContentType::kCbObject: + { + // This operation takes the proposed job spec and identifies which + // chunks are not present on this server. This list is then returned in + // the "need" list in the response + + IoBuffer Payload = HttpReq.ReadPayload(); + CbObject ActionObj = LoadCompactBinaryObject(Payload); + + std::vector<IoHash> NeedList; + + ActionObj.IterateAttachments([&](CbFieldView Field) { + const IoHash FileHash = Field.AsHash(); + + if (!m_CidStore.ContainsChunk(FileHash)) + { + NeedList.push_back(FileHash); + } + }); + + if (NeedList.empty()) + { + // We already have everything, enqueue the action for execution + + if (FunctionServiceSession::EnqueueResult Result = m_FunctionService.EnqueueAction(ActionObj, RequestPriority)) + { + ZEN_DEBUG("action accepted (lsn {})", Result.Lsn); + + return HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage); + } + else + { + // Could not resolve? + return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage); + } + } + + CbObjectWriter Cbo; + Cbo.BeginArray("need"); + + for (const IoHash& Hash : NeedList) + { + Cbo << Hash; + } + + Cbo.EndArray(); + CbObject Response = Cbo.Save(); + + return HttpReq.WriteResponse(HttpResponseCode::NotFound, Response); + } + + case HttpContentType::kCbPackage: + { + CbPackage Action = HttpReq.ReadPayloadPackage(); + CbObject ActionObj = Action.GetObject(); + + std::span<const CbAttachment> Attachments = Action.GetAttachments(); + + int AttachmentCount = 0; + int NewAttachmentCount = 0; + uint64_t TotalAttachmentBytes = 0; + uint64_t TotalNewBytes = 0; + + for (const CbAttachment& Attachment : Attachments) + { + ZEN_ASSERT(Attachment.IsCompressedBinary()); + + const IoHash DataHash = Attachment.GetHash(); + CompressedBuffer DataView = Attachment.AsCompressedBinary(); + + ZEN_UNUSED(DataHash); + + const uint64_t CompressedSize = DataView.GetCompressedSize(); + + TotalAttachmentBytes += CompressedSize; + ++AttachmentCount; + + const CidStore::InsertResult InsertResult = + m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash); + + if (InsertResult.New) + { + TotalNewBytes += CompressedSize; + ++NewAttachmentCount; + } + } + + if (FunctionServiceSession::EnqueueResult Result = m_FunctionService.EnqueueAction(ActionObj, RequestPriority)) + { + ZEN_DEBUG("accepted action (lsn {}): {} in {} attachments. {} new ({} attachments)", + Result.Lsn, + zen::NiceBytes(TotalAttachmentBytes), + AttachmentCount, + zen::NiceBytes(TotalNewBytes), + NewAttachmentCount); + + HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage); + } + else + { + // Could not resolve? + return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage); + } + } + return; + } + }, + HttpVerb::kPost); + + m_Router.RegisterRoute( + "workers/all", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + std::vector<IoHash> WorkerIds = m_FunctionService.GetKnownWorkerIds(); + + CbObjectWriter Cbo; + Cbo.BeginArray("workers"); + + for (const IoHash& WorkerId : WorkerIds) + { + Cbo.BeginObject(); + + Cbo << "id" << WorkerId; + + const auto& Descriptor = m_FunctionService.GetWorkerDescriptor(WorkerId); + + Cbo << "descriptor" << Descriptor.Descriptor.GetObject(); + + Cbo.EndObject(); + } + + Cbo.EndArray(); + + HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "sysinfo", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + SystemMetrics Sm = GetSystemMetricsForReporting(); + + CbObjectWriter Cbo; + Describe(Sm, Cbo); + + Cbo << "cpu_usage" << Sm.CpuUsagePercent; + Cbo << "memory_total" << Sm.SystemMemoryMiB * 1024 * 1024; + Cbo << "memory_used" << (Sm.SystemMemoryMiB - Sm.AvailSystemMemoryMiB) * 1024 * 1024; + Cbo << "disk_used" << 100 * 1024; + Cbo << "disk_total" << 100 * 1024 * 1024; + + return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "record/start", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + m_FunctionService.StartRecording(m_CidStore, m_BaseDir / "recording"); + + return HttpReq.WriteResponse(HttpResponseCode::OK); + }, + HttpVerb::kPost); + + m_Router.RegisterRoute( + "record/stop", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + m_FunctionService.StopRecording(); + + return HttpReq.WriteResponse(HttpResponseCode::OK); + }, + HttpVerb::kPost); +} + +HttpFunctionService::~HttpFunctionService() +{ + m_StatsService.UnregisterHandler("apply", *this); +} + +void +HttpFunctionService::Shutdown() +{ + m_FunctionService.Shutdown(); +} + +const char* +HttpFunctionService::BaseUri() const +{ + return "/apply/"; +} + +void +HttpFunctionService::HandleRequest(HttpServerRequest& Request) +{ + metrics::OperationTiming::Scope $(m_HttpRequests); + + if (m_Router.HandleRequest(Request) == false) + { + ZEN_WARN("No route found for {0}", Request.RelativeUri()); + } +} + +void +HttpFunctionService::HandleStatsRequest(HttpServerRequest& Request) +{ + CbObjectWriter Cbo; + m_FunctionService.EmitStats(Cbo); + + Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); +} + +////////////////////////////////////////////////////////////////////////// + +void +httpfunction_forcelink() +{ +} + +} // namespace zen::compute + +#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zencompute/httporchestrator.cpp b/src/zencompute/httporchestrator.cpp new file mode 100644 index 000000000..39e7e60d7 --- /dev/null +++ b/src/zencompute/httporchestrator.cpp @@ -0,0 +1,81 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "zencompute/httporchestrator.h" + +#include <zencore/compactbinarybuilder.h> +#include <zencore/logging.h> + +namespace zen::compute { + +HttpOrchestratorService::HttpOrchestratorService() : m_Log(logging::Get("orch")) +{ + m_Router.RegisterRoute( + "provision", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + CbObjectWriter Cbo; + Cbo.BeginArray("workers"); + + m_KnownWorkersLock.WithSharedLock([&] { + for (const auto& [WorkerId, Worker] : m_KnownWorkers) + { + Cbo.BeginObject(); + Cbo << "uri" << Worker.BaseUri; + Cbo << "dt" << Worker.LastSeen.GetElapsedTimeMs(); + Cbo.EndObject(); + } + }); + + Cbo.EndArray(); + + HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + }, + HttpVerb::kPost); + + m_Router.RegisterRoute( + "announce", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + CbObject Data = HttpReq.ReadPayloadObject(); + + std::string_view WorkerId = Data["id"].AsString(""); + std::string_view WorkerUri = Data["uri"].AsString(""); + + if (WorkerId.empty() || WorkerUri.empty()) + { + return HttpReq.WriteResponse(HttpResponseCode::BadRequest); + } + + m_KnownWorkersLock.WithExclusiveLock([&] { + auto& Worker = m_KnownWorkers[std::string(WorkerId)]; + Worker.BaseUri = WorkerUri; + Worker.LastSeen.Reset(); + }); + + HttpReq.WriteResponse(HttpResponseCode::OK); + }, + HttpVerb::kPost); +} + +HttpOrchestratorService::~HttpOrchestratorService() +{ +} + +const char* +HttpOrchestratorService::BaseUri() const +{ + return "/orch/"; +} + +void +HttpOrchestratorService::HandleRequest(HttpServerRequest& Request) +{ + if (m_Router.HandleRequest(Request) == false) + { + ZEN_WARN("No route found for {0}", Request.RelativeUri()); + } +} + +} // namespace zen::compute diff --git a/src/zencompute/include/zencompute/functionservice.h b/src/zencompute/include/zencompute/functionservice.h new file mode 100644 index 000000000..1deb99fd5 --- /dev/null +++ b/src/zencompute/include/zencompute/functionservice.h @@ -0,0 +1,132 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/zencore.h> + +#if !defined(ZEN_WITH_COMPUTE_SERVICES) +# define ZEN_WITH_COMPUTE_SERVICES 1 +#endif + +#if ZEN_WITH_COMPUTE_SERVICES + +# include <zencore/compactbinary.h> +# include <zencore/compactbinarypackage.h> +# include <zencore/iohash.h> +# include <zenstore/zenstore.h> +# include <zenhttp/httpcommon.h> + +# include <filesystem> + +namespace zen { +class ChunkResolver; +class CbObjectWriter; +} // namespace zen + +namespace zen::compute { + +class ActionRecorder; +class FunctionServiceSession; +class IActionResultHandler; +class LocalProcessRunner; +class RemoteHttpRunner; +struct RunnerAction; +struct SubmitResult; + +struct WorkerDesc +{ + CbPackage Descriptor; + IoHash WorkerId{IoHash::Zero}; + + inline operator bool() const { return WorkerId != IoHash::Zero; } +}; + +/** + * Lambda style compute function service + * + * The responsibility of this class is to accept function execution requests, and + * schedule them using one or more FunctionRunner instances. It will basically always + * accept requests, queueing them if necessary, and then hand them off to runners + * as they become available. + * + * This is typically fronted by an API service that handles communication with clients. + */ +class FunctionServiceSession final +{ +public: + FunctionServiceSession(ChunkResolver& InChunkResolver); + ~FunctionServiceSession(); + + void Shutdown(); + bool IsHealthy(); + + // Worker registration and discovery + + void RegisterWorker(CbPackage Worker); + [[nodiscard]] WorkerDesc GetWorkerDescriptor(const IoHash& WorkerId); + [[nodiscard]] std::vector<IoHash> GetKnownWorkerIds(); + + // Action runners + + void AddLocalRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath); + void AddRemoteRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath, std::string_view HostName); + + // Action submission + + struct EnqueueResult + { + int Lsn; + CbObject ResponseMessage; + + inline operator bool() const { return Lsn != 0; } + }; + + [[nodiscard]] EnqueueResult EnqueueResolvedAction(WorkerDesc Worker, CbObject ActionObj, int Priority); + [[nodiscard]] EnqueueResult EnqueueAction(CbObject ActionObject, int Priority); + + // Completed action tracking + + [[nodiscard]] HttpResponseCode GetActionResult(int ActionLsn, CbPackage& OutResultPackage); + [[nodiscard]] HttpResponseCode FindActionResult(const IoHash& ActionId, CbPackage& ResultPackage); + + void GetCompleted(CbWriter&); + + // Action history tracking (note that this is separate from completed action tracking, and + // will include actions which have been retired and no longer have their results available) + + struct ActionHistoryEntry + { + int Lsn; + IoHash ActionId; + IoHash WorkerId; + CbObject ActionDescriptor; + bool Succeeded; + uint64_t Timestamps[5] = {}; + }; + + [[nodiscard]] std::vector<ActionHistoryEntry> GetActionHistory(int Limit = 100); + + // Stats reporting + + void EmitStats(CbObjectWriter& Cbo); + + // Recording + + void StartRecording(ChunkResolver& InResolver, const std::filesystem::path& RecordingPath); + void StopRecording(); + +private: + void PostUpdate(RunnerAction* Action); + + friend class FunctionRunner; + friend struct RunnerAction; + + struct Impl; + std::unique_ptr<Impl> m_Impl; +}; + +void function_forcelink(); + +} // namespace zen::compute + +#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zencompute/include/zencompute/httpfunctionservice.h b/src/zencompute/include/zencompute/httpfunctionservice.h new file mode 100644 index 000000000..6e2344ae6 --- /dev/null +++ b/src/zencompute/include/zencompute/httpfunctionservice.h @@ -0,0 +1,73 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/zencore.h> + +#if !defined(ZEN_WITH_COMPUTE_SERVICES) +# define ZEN_WITH_COMPUTE_SERVICES 1 +#endif + +#if ZEN_WITH_COMPUTE_SERVICES + +# include "zencompute/functionservice.h" + +# include <zencore/compactbinary.h> +# include <zencore/compactbinarypackage.h> +# include <zencore/iohash.h> +# include <zencore/logging.h> +# include <zentelemetry/stats.h> +# include <zenhttp/httpserver.h> + +# include <deque> +# include <filesystem> +# include <unordered_map> + +namespace zen { +class CidStore; +} + +namespace zen::compute { + +class HttpFunctionService; +class FunctionService; + +/** + * HTTP interface for compute function service + */ +class HttpFunctionService : public HttpService, public IHttpStatsProvider +{ +public: + HttpFunctionService(CidStore& InCidStore, IHttpStatsService& StatsService, const std::filesystem::path& BaseDir); + ~HttpFunctionService(); + + void Shutdown(); + + virtual const char* BaseUri() const override; + virtual void HandleRequest(HttpServerRequest& Request) override; + + // IHttpStatsProvider + + virtual void HandleStatsRequest(HttpServerRequest& Request) override; + +protected: + CidStore& m_CidStore; + IHttpStatsService& m_StatsService; + LoggerRef Log() { return m_Log; } + +private: + LoggerRef m_Log; + std::filesystem ::path m_BaseDir; + HttpRequestRouter m_Router; + FunctionServiceSession m_FunctionService; + + // Metrics + + metrics::OperationTiming m_HttpRequests; +}; + +void httpfunction_forcelink(); + +} // namespace zen::compute + +#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zencompute/include/zencompute/httporchestrator.h b/src/zencompute/include/zencompute/httporchestrator.h new file mode 100644 index 000000000..168c6d7fe --- /dev/null +++ b/src/zencompute/include/zencompute/httporchestrator.h @@ -0,0 +1,44 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/logging.h> +#include <zencore/thread.h> +#include <zencore/timer.h> +#include <zenhttp/httpserver.h> + +#include <unordered_map> + +namespace zen::compute { + +/** + * Mock orchestrator service, for testing dynamic provisioning + */ + +class HttpOrchestratorService : public HttpService +{ +public: + HttpOrchestratorService(); + ~HttpOrchestratorService(); + + HttpOrchestratorService(const HttpOrchestratorService&) = delete; + HttpOrchestratorService& operator=(const HttpOrchestratorService&) = delete; + + virtual const char* BaseUri() const override; + virtual void HandleRequest(HttpServerRequest& Request) override; + +private: + HttpRequestRouter m_Router; + LoggerRef m_Log; + + struct KnownWorker + { + std::string_view BaseUri; + Stopwatch LastSeen; + }; + + RwLock m_KnownWorkersLock; + std::unordered_map<std::string, KnownWorker> m_KnownWorkers; +}; + +} // namespace zen::compute diff --git a/src/zencompute/include/zencompute/recordingreader.h b/src/zencompute/include/zencompute/recordingreader.h new file mode 100644 index 000000000..bf1aff125 --- /dev/null +++ b/src/zencompute/include/zencompute/recordingreader.h @@ -0,0 +1,127 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencompute/functionservice.h> +#include <zencompute/zencompute.h> +#include <zencore/basicfile.h> +#include <zencore/compactbinarybuilder.h> +#include <zenstore/cidstore.h> +#include <zenstore/gc.h> +#include <zenstore/zenstore.h> + +#include <filesystem> +#include <functional> +#include <unordered_map> + +namespace zen { +class CbObject; +class CbPackage; +struct IoHash; +} // namespace zen + +#if ZEN_WITH_COMPUTE_SERVICES + +namespace zen::compute { + +////////////////////////////////////////////////////////////////////////// + +class RecordingReaderBase +{ + RecordingReaderBase(const RecordingReaderBase&) = delete; + RecordingReaderBase& operator=(const RecordingReaderBase&) = delete; + +public: + RecordingReaderBase() = default; + virtual ~RecordingReaderBase() = 0; + virtual std::unordered_map<IoHash, CbPackage> ReadWorkers() = 0; + virtual void IterateActions(std::function<void(CbObject ActionObject, const IoHash& ActionId)>&& Callback, int TargetParallelism) = 0; + virtual size_t GetActionCount() const = 0; +}; + +////////////////////////////////////////////////////////////////////////// + +/** + * Reader for recordings done via the zencompute recording system, which + * have a shared chunk store and a log of actions with pointers into the + * chunk store for their data. + */ +class RecordingReader : public RecordingReaderBase, public ChunkResolver +{ +public: + explicit RecordingReader(const std::filesystem::path& RecordingPath); + ~RecordingReader(); + + virtual std::unordered_map<zen::IoHash, zen::CbPackage> ReadWorkers() override; + + virtual void IterateActions(std::function<void(CbObject ActionObject, const IoHash& ActionId)>&& Callback, + int TargetParallelism) override; + virtual size_t GetActionCount() const override; + +private: + std::filesystem::path m_RecordingLogDir; + BasicFile m_WorkerDataFile; + BasicFile m_ActionDataFile; + GcManager m_Gc; + CidStore m_CidStore{m_Gc}; + + // ChunkResolver interface + virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) override; + + struct ActionEntry + { + IoHash ActionId; + uint64_t Offset; + uint64_t Size; + }; + + std::vector<ActionEntry> m_Actions; + + void ScanActions(); +}; + +////////////////////////////////////////////////////////////////////////// + +struct LocalResolver : public ChunkResolver +{ + LocalResolver(const LocalResolver&) = delete; + LocalResolver& operator=(const LocalResolver&) = delete; + + LocalResolver() = default; + ~LocalResolver() = default; + + virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) override; + void Add(const IoHash& Cid, IoBuffer Data); + +private: + RwLock MapLock; + std::unordered_map<IoHash, IoBuffer> Attachments; +}; + +/** + * This is a reader for UE/DDB recordings, which have a different layout on + * disk (no shared chunk store) + */ +class UeRecordingReader : public RecordingReaderBase, public ChunkResolver +{ +public: + explicit UeRecordingReader(const std::filesystem::path& RecordingPath); + ~UeRecordingReader(); + + virtual std::unordered_map<zen::IoHash, zen::CbPackage> ReadWorkers() override; + virtual void IterateActions(std::function<void(CbObject ActionObject, const IoHash& ActionId)>&& Callback, + int TargetParallelism) override; + virtual size_t GetActionCount() const override; + virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) override; + +private: + std::filesystem::path m_RecordingDir; + LocalResolver m_LocalResolver; + std::vector<std::filesystem::path> m_WorkDirs; + + CbPackage ReadAction(std::filesystem::path WorkDir); +}; + +} // namespace zen::compute + +#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zencompute/include/zencompute/zencompute.h b/src/zencompute/include/zencompute/zencompute.h new file mode 100644 index 000000000..6dc32eeea --- /dev/null +++ b/src/zencompute/include/zencompute/zencompute.h @@ -0,0 +1,11 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/zencore.h> + +namespace zen { + +void zencompute_forcelinktests(); + +} diff --git a/src/zencompute/localrunner.cpp b/src/zencompute/localrunner.cpp new file mode 100644 index 000000000..9a27f3f3d --- /dev/null +++ b/src/zencompute/localrunner.cpp @@ -0,0 +1,722 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "localrunner.h" + +#if ZEN_WITH_COMPUTE_SERVICES + +# include <zencore/compactbinary.h> +# include <zencore/compactbinarybuilder.h> +# include <zencore/compactbinarypackage.h> +# include <zencore/compress.h> +# include <zencore/except.h> +# include <zencore/filesystem.h> +# include <zencore/fmtutils.h> +# include <zencore/iobuffer.h> +# include <zencore/iohash.h> +# include <zencore/system.h> +# include <zencore/scopeguard.h> +# include <zencore/timer.h> +# include <zenstore/cidstore.h> + +# include <span> + +namespace zen::compute { + +using namespace std::literals; + +LocalProcessRunner::LocalProcessRunner(ChunkResolver& Resolver, const std::filesystem::path& BaseDir) +: FunctionRunner(BaseDir) +, m_Log(logging::Get("local_exec")) +, m_ChunkResolver(Resolver) +, m_WorkerPath(std::filesystem::weakly_canonical(BaseDir / "workers")) +, m_SandboxPath(std::filesystem::weakly_canonical(BaseDir / "scratch")) +{ + SystemMetrics Sm = GetSystemMetricsForReporting(); + + m_MaxRunningActions = Sm.LogicalProcessorCount * 2; + + ZEN_INFO("Max concurrent action count: {}", m_MaxRunningActions); + + bool DidCleanup = false; + + if (std::filesystem::is_directory(m_ActionsPath)) + { + ZEN_INFO("Cleaning '{}'", m_ActionsPath); + + std::error_code Ec; + CleanDirectory(m_ActionsPath, /* ForceRemoveReadOnlyFiles */ true, Ec); + + if (Ec) + { + ZEN_WARN("Unable to clean '{}': {}", m_ActionsPath, Ec.message()); + } + + DidCleanup = true; + } + + if (std::filesystem::is_directory(m_SandboxPath)) + { + ZEN_INFO("Cleaning '{}'", m_SandboxPath); + std::error_code Ec; + CleanDirectory(m_SandboxPath, /* ForceRemoveReadOnlyFiles */ true, Ec); + + if (Ec) + { + ZEN_WARN("Unable to clean '{}': {}", m_SandboxPath, Ec.message()); + } + + DidCleanup = true; + } + + // We clean out all workers on startup since we can't know they are good. They could be bad + // due to tampering, malware (which I also mean to include AV and antimalware software) or + // other processes we have no control over + if (std::filesystem::is_directory(m_WorkerPath)) + { + ZEN_INFO("Cleaning '{}'", m_WorkerPath); + std::error_code Ec; + CleanDirectory(m_WorkerPath, /* ForceRemoveReadOnlyFiles */ true, Ec); + + if (Ec) + { + ZEN_WARN("Unable to clean '{}': {}", m_WorkerPath, Ec.message()); + } + + DidCleanup = true; + } + + if (DidCleanup) + { + ZEN_INFO("Cleanup complete"); + } + + m_MonitorThread = std::thread{&LocalProcessRunner::MonitorThreadFunction, this}; + +# if ZEN_PLATFORM_WINDOWS + // Suppress any error dialogs caused by missing dependencies + UINT OldMode = ::SetErrorMode(0); + ::SetErrorMode(OldMode | SEM_FAILCRITICALERRORS); +# endif + + m_AcceptNewActions = true; +} + +LocalProcessRunner::~LocalProcessRunner() +{ + try + { + Shutdown(); + } + catch (std::exception& Ex) + { + ZEN_WARN("exception during local process runner shutdown: {}", Ex.what()); + } +} + +void +LocalProcessRunner::Shutdown() +{ + m_AcceptNewActions = false; + + m_MonitorThreadEnabled = false; + m_MonitorThreadEvent.Set(); + if (m_MonitorThread.joinable()) + { + m_MonitorThread.join(); + } + + CancelRunningActions(); +} + +std::filesystem::path +LocalProcessRunner::CreateNewSandbox() +{ + std::string UniqueId = std::to_string(++m_SandboxCounter); + std::filesystem::path Path = m_SandboxPath / UniqueId; + zen::CreateDirectories(Path); + + return Path; +} + +void +LocalProcessRunner::RegisterWorker(const CbPackage& WorkerPackage) +{ + if (m_DumpActions) + { + CbObject WorkerDescriptor = WorkerPackage.GetObject(); + const IoHash& WorkerId = WorkerPackage.GetObjectHash(); + + std::string UniqueId = fmt::format("worker_{}"sv, WorkerId); + std::filesystem::path Path = m_ActionsPath / UniqueId; + + zen::WriteFile(Path / "worker.ucb", WorkerDescriptor.GetBuffer().AsIoBuffer()); + + ManifestWorker(WorkerPackage, Path / "tree", [&](const IoHash& Cid, CompressedBuffer& ChunkBuffer) { + std::filesystem::path ChunkPath = Path / "chunks" / Cid.ToHexString(); + zen::WriteFile(ChunkPath, ChunkBuffer.GetCompressed()); + }); + + ZEN_INFO("dumped worker '{}' to 'file://{}'", WorkerId, Path); + } +} + +size_t +LocalProcessRunner::QueryCapacity() +{ + // Estimate how much more work we're ready to accept + + RwLock::SharedLockScope _{m_RunningLock}; + + if (!m_AcceptNewActions) + { + return 0; + } + + size_t RunningCount = m_RunningMap.size(); + + if (RunningCount >= size_t(m_MaxRunningActions)) + { + return 0; + } + + return m_MaxRunningActions - RunningCount; +} + +std::vector<SubmitResult> +LocalProcessRunner::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) +{ + std::vector<SubmitResult> Results; + + for (const Ref<RunnerAction>& Action : Actions) + { + Results.push_back(SubmitAction(Action)); + } + + return Results; +} + +SubmitResult +LocalProcessRunner::SubmitAction(Ref<RunnerAction> Action) +{ + // Verify whether we can accept more work + + { + RwLock::SharedLockScope _{m_RunningLock}; + + if (!m_AcceptNewActions) + { + return SubmitResult{.IsAccepted = false}; + } + + if (m_RunningMap.size() >= size_t(m_MaxRunningActions)) + { + return SubmitResult{.IsAccepted = false}; + } + } + + using namespace std::literals; + + // Each enqueued action is assigned an integer index (logical sequence number), + // which we use as a key for tracking data structures and as an opaque id which + // may be used by clients to reference the scheduled action + + const int32_t ActionLsn = Action->ActionLsn; + const CbObject& ActionObj = Action->ActionObj; + const IoHash ActionId = ActionObj.GetHash(); + + MaybeDumpAction(ActionLsn, ActionObj); + + std::filesystem::path SandboxPath = CreateNewSandbox(); + + CbPackage WorkerPackage = Action->Worker.Descriptor; + + std::filesystem::path WorkerPath = ManifestWorker(Action->Worker); + + // Write out action + + zen::WriteFile(SandboxPath / "build.action", ActionObj.GetBuffer().AsIoBuffer()); + + // Manifest inputs in sandbox + + ActionObj.IterateAttachments([&](CbFieldView Field) { + const IoHash Cid = Field.AsHash(); + std::filesystem::path FilePath{SandboxPath / "Inputs"sv / Cid.ToHexString()}; + IoBuffer DataBuffer = m_ChunkResolver.FindChunkByCid(Cid); + + if (!DataBuffer) + { + throw std::runtime_error(fmt::format("input CID chunk '{}' missing", Cid)); + } + + zen::WriteFile(FilePath, DataBuffer); + }); + +# if ZEN_PLATFORM_WINDOWS + // Set up environment variables + + StringBuilder<1024> EnvironmentBlock; + + CbObject WorkerDescription = WorkerPackage.GetObject(); + + for (auto& It : WorkerDescription["environment"sv]) + { + EnvironmentBlock.Append(It.AsString()); + EnvironmentBlock.Append('\0'); + } + EnvironmentBlock.Append('\0'); + EnvironmentBlock.Append('\0'); + + // Execute process - this spawns the child process immediately without waiting + // for completion + + std::string_view ExecPath = WorkerDescription["path"sv].AsString(); + std::filesystem::path ExePath = WorkerPath / std::filesystem::path(ExecPath).make_preferred(); + + ExtendableWideStringBuilder<512> CommandLine; + CommandLine.Append(L'"'); + CommandLine.Append(ExePath.c_str()); + CommandLine.Append(L'"'); + CommandLine.Append(L" -Build=build.action"); + + LPSECURITY_ATTRIBUTES lpProcessAttributes = nullptr; + LPSECURITY_ATTRIBUTES lpThreadAttributes = nullptr; + BOOL bInheritHandles = FALSE; + DWORD dwCreationFlags = 0; + + STARTUPINFO StartupInfo{}; + StartupInfo.cb = sizeof StartupInfo; + + PROCESS_INFORMATION ProcessInformation{}; + + ZEN_DEBUG("Executing: {}", WideToUtf8(CommandLine.c_str())); + + CommandLine.EnsureNulTerminated(); + + BOOL Success = CreateProcessW(nullptr, + CommandLine.Data(), + lpProcessAttributes, + lpThreadAttributes, + bInheritHandles, + dwCreationFlags, + (LPVOID)EnvironmentBlock.Data(), // Environment block + SandboxPath.c_str(), // Current directory + &StartupInfo, + /* out */ &ProcessInformation); + + if (!Success) + { + // TODO: this is probably not the best way to report failure. The return + // object should include a failure state and context + + zen::ThrowLastError("Unable to launch process" /* TODO: Add context */); + } + + CloseHandle(ProcessInformation.hThread); + + Ref<RunningAction> NewAction{new RunningAction()}; + NewAction->Action = Action; + NewAction->ProcessHandle = ProcessInformation.hProcess; + NewAction->SandboxPath = std::move(SandboxPath); + + { + RwLock::ExclusiveLockScope _(m_RunningLock); + + m_RunningMap[ActionLsn] = std::move(NewAction); + } + + Action->SetActionState(RunnerAction::State::Running); +# else + ZEN_UNUSED(ActionId); + + ZEN_NOT_IMPLEMENTED(); + + int ExitCode = 0; +# endif + + return SubmitResult{.IsAccepted = true}; +} + +size_t +LocalProcessRunner::GetSubmittedActionCount() +{ + RwLock::SharedLockScope _(m_RunningLock); + return m_RunningMap.size(); +} + +std::filesystem::path +LocalProcessRunner::ManifestWorker(const WorkerDesc& Worker) +{ + RwLock::SharedLockScope _(m_WorkerLock); + + std::filesystem::path WorkerDir = m_WorkerPath / fmt::format("runner_{}", Worker.WorkerId); + + if (!std::filesystem::exists(WorkerDir)) + { + _.ReleaseNow(); + + RwLock::ExclusiveLockScope $(m_WorkerLock); + + if (!std::filesystem::exists(WorkerDir)) + { + ManifestWorker(Worker.Descriptor, WorkerDir, [](const IoHash&, CompressedBuffer&) {}); + } + } + + return WorkerDir; +} + +void +LocalProcessRunner::DecompressAttachmentToFile(const CbPackage& FromPackage, + CbObjectView FileEntry, + const std::filesystem::path& SandboxRootPath, + std::function<void(const IoHash&, CompressedBuffer&)>& ChunkReferenceCallback) +{ + std::string_view Name = FileEntry["name"sv].AsString(); + const IoHash ChunkHash = FileEntry["hash"sv].AsHash(); + const uint64_t Size = FileEntry["size"sv].AsUInt64(); + + CompressedBuffer Compressed; + + if (const CbAttachment* Attachment = FromPackage.FindAttachment(ChunkHash)) + { + Compressed = Attachment->AsCompressedBinary(); + } + else + { + IoBuffer DataBuffer = m_ChunkResolver.FindChunkByCid(ChunkHash); + + if (!DataBuffer) + { + throw std::runtime_error(fmt::format("worker chunk '{}' missing", ChunkHash)); + } + + uint64_t DataRawSize = 0; + IoHash DataRawHash; + Compressed = CompressedBuffer::FromCompressed(SharedBuffer{DataBuffer}, DataRawHash, DataRawSize); + + if (DataRawSize != Size) + { + throw std::runtime_error( + fmt::format("worker chunk '{}' size: {}, action spec expected {}", ChunkHash, DataBuffer.Size(), Size)); + } + } + + ChunkReferenceCallback(ChunkHash, Compressed); + + std::filesystem::path FilePath{SandboxRootPath / std::filesystem::path(Name).make_preferred()}; + + SharedBuffer Decompressed = Compressed.Decompress(); + zen::WriteFile(FilePath, Decompressed.AsIoBuffer()); +} + +void +LocalProcessRunner::ManifestWorker(const CbPackage& WorkerPackage, + const std::filesystem::path& SandboxPath, + std::function<void(const IoHash&, CompressedBuffer&)>&& ChunkReferenceCallback) +{ + CbObject WorkerDescription = WorkerPackage.GetObject(); + + // Manifest worker in Sandbox + + for (auto& It : WorkerDescription["executables"sv]) + { + DecompressAttachmentToFile(WorkerPackage, It.AsObjectView(), SandboxPath, ChunkReferenceCallback); + } + + for (auto& It : WorkerDescription["dirs"sv]) + { + std::string_view Name = It.AsString(); + std::filesystem::path DirPath{SandboxPath / std::filesystem::path(Name).make_preferred()}; + zen::CreateDirectories(DirPath); + } + + for (auto& It : WorkerDescription["files"sv]) + { + DecompressAttachmentToFile(WorkerPackage, It.AsObjectView(), SandboxPath, ChunkReferenceCallback); + } + + WriteFile(SandboxPath / "worker.zcb", WorkerDescription.GetBuffer().AsIoBuffer()); +} + +CbPackage +LocalProcessRunner::GatherActionOutputs(std::filesystem::path SandboxPath) +{ + std::filesystem::path OutputFile = SandboxPath / "build.output"; + FileContents OutputData = zen::ReadFile(OutputFile); + + if (OutputData.ErrorCode) + { + throw std::system_error(OutputData.ErrorCode, fmt::format("Failed to read build output file '{}'", OutputFile)); + } + + CbPackage OutputPackage; + CbObject Output = zen::LoadCompactBinaryObject(OutputData.Flatten()); + + uint64_t TotalAttachmentBytes = 0; + uint64_t TotalRawAttachmentBytes = 0; + + Output.IterateAttachments([&](CbFieldView Field) { + IoHash Hash = Field.AsHash(); + std::filesystem::path OutputPath{SandboxPath / "Outputs" / Hash.ToHexString()}; + FileContents ChunkData = zen::ReadFile(OutputPath); + + if (ChunkData.ErrorCode) + { + throw std::system_error(ChunkData.ErrorCode, fmt::format("Failed to read build output file '{}'", OutputPath)); + } + + uint64_t ChunkDataRawSize = 0; + IoHash ChunkDataHash; + CompressedBuffer AttachmentBuffer = + CompressedBuffer::FromCompressed(SharedBuffer(ChunkData.Flatten()), ChunkDataHash, ChunkDataRawSize); + + if (!AttachmentBuffer) + { + throw std::runtime_error("Invalid output encountered (not valid CompressedBuffer format)"); + } + + TotalAttachmentBytes += AttachmentBuffer.GetCompressedSize(); + TotalRawAttachmentBytes += ChunkDataRawSize; + + CbAttachment Attachment(std::move(AttachmentBuffer), ChunkDataHash); + OutputPackage.AddAttachment(Attachment); + }); + + OutputPackage.SetObject(Output); + + ZEN_DEBUG("Action completed with {} attachments ({} compressed, {} uncompressed)", + OutputPackage.GetAttachments().size(), + NiceBytes(TotalAttachmentBytes), + NiceBytes(TotalRawAttachmentBytes)); + + return OutputPackage; +} + +void +LocalProcessRunner::MonitorThreadFunction() +{ + SetCurrentThreadName("LocalProcessRunner_Monitor"); + + auto _ = MakeGuard([&] { ZEN_INFO("monitor thread exiting"); }); + + do + { + // On Windows it's possible to wait on process handles, so we wait for either a process to exit + // or for the monitor event to be signaled (which indicates we should check for cancellation + // or shutdown). This could be further improved by using a completion port and registering process + // handles with it, but this is a reasonable first implementation given that we shouldn't be dealing + // with an enormous number of concurrent processes. + // + // On other platforms we just wait on the monitor event and poll for process exits at intervals. +# if ZEN_PLATFORM_WINDOWS + auto WaitOnce = [&] { + HANDLE WaitHandles[MAXIMUM_WAIT_OBJECTS]; + + uint32_t NumHandles = 0; + + WaitHandles[NumHandles++] = m_MonitorThreadEvent.GetWindowsHandle(); + + m_RunningLock.WithSharedLock([&] { + for (auto It = begin(m_RunningMap), ItEnd = end(m_RunningMap); It != ItEnd && NumHandles < MAXIMUM_WAIT_OBJECTS; ++It) + { + Ref<RunningAction> Action = It->second; + + WaitHandles[NumHandles++] = Action->ProcessHandle; + } + }); + + DWORD WaitResult = WaitForMultipleObjects(NumHandles, WaitHandles, FALSE, 1000); + + // return true if a handle was signaled + return (WaitResult <= NumHandles); + }; +# else + auto WaitOnce = [&] { return m_MonitorThreadEvent.Wait(1000); }; +# endif + + while (!WaitOnce()) + { + if (m_MonitorThreadEnabled == false) + { + return; + } + + SweepRunningActions(); + } + + // Signal received + + SweepRunningActions(); + } while (m_MonitorThreadEnabled); +} + +void +LocalProcessRunner::CancelRunningActions() +{ + Stopwatch Timer; + std::unordered_map<int, Ref<RunningAction>> RunningMap; + + m_RunningLock.WithExclusiveLock([&] { std::swap(RunningMap, m_RunningMap); }); + + if (RunningMap.empty()) + { + return; + } + + ZEN_INFO("cancelling all running actions"); + + // For expedience we initiate the process termination for all known + // processes before attempting to wait for them to exit. + + std::vector<int> TerminatedLsnList; + + for (const auto& Kv : RunningMap) + { + Ref<RunningAction> Action = Kv.second; + + // Terminate running process + +# if ZEN_PLATFORM_WINDOWS + BOOL Success = TerminateProcess(Action->ProcessHandle, 222); + + if (Success) + { + TerminatedLsnList.push_back(Kv.first); + } + else + { + DWORD LastError = GetLastError(); + + if (LastError != ERROR_ACCESS_DENIED) + { + ZEN_WARN("TerminateProcess for LSN {} not successful: {}", Action->Action->ActionLsn, GetSystemErrorAsString(LastError)); + } + } +# else + ZEN_NOT_IMPLEMENTED("need to implement process termination"); +# endif + } + + // We only post results for processes we have terminated, in order + // to avoid multiple results getting posted for the same action + + for (int Lsn : TerminatedLsnList) + { + if (auto It = RunningMap.find(Lsn); It != RunningMap.end()) + { + Ref<RunningAction> Running = It->second; + +# if ZEN_PLATFORM_WINDOWS + if (Running->ProcessHandle != INVALID_HANDLE_VALUE) + { + DWORD WaitResult = WaitForSingleObject(Running->ProcessHandle, 2000); + + if (WaitResult != WAIT_OBJECT_0) + { + ZEN_WARN("wait for LSN {}: process exit did not succeed, result = {}", Running->Action->ActionLsn, WaitResult); + } + else + { + ZEN_DEBUG("LSN {}: process exit OK", Running->Action->ActionLsn); + } + } +# endif + + // Clean up and post error result + + DeleteDirectories(Running->SandboxPath); + Running->Action->SetActionState(RunnerAction::State::Failed); + } + } + + ZEN_INFO("DONE - cancelled {} running processes (took {})", TerminatedLsnList.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); +} + +void +LocalProcessRunner::SweepRunningActions() +{ + std::vector<Ref<RunningAction>> CompletedActions; + + m_RunningLock.WithExclusiveLock([&] { + // TODO: It would be good to not hold the exclusive lock while making + // system calls and other expensive operations. + + for (auto It = begin(m_RunningMap), ItEnd = end(m_RunningMap); It != ItEnd;) + { + Ref<RunningAction> Action = It->second; + +# if ZEN_PLATFORM_WINDOWS + DWORD ExitCode = 0; + BOOL IsSuccess = GetExitCodeProcess(Action->ProcessHandle, &ExitCode); + + if (IsSuccess && ExitCode != STILL_ACTIVE) + { + CloseHandle(Action->ProcessHandle); + Action->ProcessHandle = INVALID_HANDLE_VALUE; + + CompletedActions.push_back(std::move(Action)); + It = m_RunningMap.erase(It); + } + else + { + ++It; + } +# else + // TODO: implement properly for Mac/Linux + + ZEN_UNUSED(Action); +# endif + } + }); + + // Notify outer. Note that this has to be done without holding any local locks + // otherwise we may end up with deadlocks. + + for (Ref<RunningAction> Running : CompletedActions) + { + const int ActionLsn = Running->Action->ActionLsn; + + if (Running->ExitCode == 0) + { + try + { + // Gather outputs + + CbPackage OutputPackage = GatherActionOutputs(Running->SandboxPath); + + Running->Action->SetResult(std::move(OutputPackage)); + Running->Action->SetActionState(RunnerAction::State::Completed); + + // We can delete the files at this point + if (!DeleteDirectories(Running->SandboxPath)) + { + ZEN_WARN("Unable to delete directory '{}', this will continue to exist until service restart", Running->SandboxPath); + } + + // Success -- continue with next iteration of the loop + continue; + } + catch (std::exception& Ex) + { + ZEN_ERROR("Encountered failure while gathering outputs for action lsn {}, '{}'", ActionLsn, Ex.what()); + } + } + + // Failed - for now this is indicated with an empty package in + // the results map. We can clean out the sandbox directory immediately. + + std::error_code Ec; + DeleteDirectories(Running->SandboxPath, Ec); + + if (Ec) + { + ZEN_WARN("Unable to delete sandbox directory '{}': {}", Running->SandboxPath, Ec.message()); + } + + Running->Action->SetActionState(RunnerAction::State::Failed); + } +} + +} // namespace zen::compute + +#endif diff --git a/src/zencompute/localrunner.h b/src/zencompute/localrunner.h new file mode 100644 index 000000000..35f464805 --- /dev/null +++ b/src/zencompute/localrunner.h @@ -0,0 +1,100 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "zencompute/functionservice.h" + +#if ZEN_WITH_COMPUTE_SERVICES + +# include "functionrunner.h" + +# include <zencore/thread.h> +# include <zencore/zencore.h> +# include <zenstore/cidstore.h> +# include <zencore/compactbinarypackage.h> +# include <zencore/logging.h> + +# include <atomic> +# include <filesystem> +# include <thread> + +namespace zen { +class CbPackage; +} + +namespace zen::compute { + +/** Direct process spawner + + This runner simply sets up a directory structure for each job and + creates a process to perform the computation in it. It is not very + efficient and is intended mostly for testing. + + */ + +class LocalProcessRunner : public FunctionRunner +{ + LocalProcessRunner(LocalProcessRunner&&) = delete; + LocalProcessRunner& operator=(LocalProcessRunner&&) = delete; + +public: + LocalProcessRunner(ChunkResolver& Resolver, const std::filesystem::path& BaseDir); + ~LocalProcessRunner(); + + virtual void Shutdown() override; + virtual void RegisterWorker(const CbPackage& WorkerPackage) override; + [[nodiscard]] virtual SubmitResult SubmitAction(Ref<RunnerAction> Action) override; + [[nodiscard]] virtual bool IsHealthy() override { return true; } + [[nodiscard]] virtual size_t GetSubmittedActionCount() override; + [[nodiscard]] virtual size_t QueryCapacity() override; + [[nodiscard]] virtual std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) override; + +protected: + LoggerRef Log() { return m_Log; } + + LoggerRef m_Log; + + struct RunningAction : public RefCounted + { + Ref<RunnerAction> Action; + void* ProcessHandle = nullptr; + int ExitCode = 0; + std::filesystem::path SandboxPath; + }; + + std::atomic_bool m_AcceptNewActions; + ChunkResolver& m_ChunkResolver; + RwLock m_WorkerLock; + std::filesystem::path m_WorkerPath; + std::atomic<int32_t> m_SandboxCounter = 0; + std::filesystem::path m_SandboxPath; + int32_t m_MaxRunningActions = 64; // arbitrary limit for testing + + // if used in conjuction with m_ResultsLock, this lock must be taken *after* + // m_ResultsLock to avoid deadlocks + RwLock m_RunningLock; + std::unordered_map<int, Ref<RunningAction>> m_RunningMap; + + std::thread m_MonitorThread; + std::atomic<bool> m_MonitorThreadEnabled{true}; + Event m_MonitorThreadEvent; + void MonitorThreadFunction(); + void SweepRunningActions(); + void CancelRunningActions(); + + std::filesystem::path CreateNewSandbox(); + void ManifestWorker(const CbPackage& WorkerPackage, + const std::filesystem::path& SandboxPath, + std::function<void(const IoHash&, CompressedBuffer&)>&& ChunkReferenceCallback); + std::filesystem::path ManifestWorker(const WorkerDesc& Worker); + CbPackage GatherActionOutputs(std::filesystem::path SandboxPath); + + void DecompressAttachmentToFile(const CbPackage& FromPackage, + CbObjectView FileEntry, + const std::filesystem::path& SandboxRootPath, + std::function<void(const IoHash&, CompressedBuffer&)>& ChunkReferenceCallback); +}; + +} // namespace zen::compute + +#endif diff --git a/src/zencompute/recordingreader.cpp b/src/zencompute/recordingreader.cpp new file mode 100644 index 000000000..1c1a119cf --- /dev/null +++ b/src/zencompute/recordingreader.cpp @@ -0,0 +1,335 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "zencompute/recordingreader.h" + +#include <zencore/compactbinary.h> +#include <zencore/compactbinaryfile.h> +#include <zencore/compactbinaryvalue.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> + +#if ZEN_PLATFORM_WINDOWS +# include <ppl.h> +# define ZEN_CONCRT_AVAILABLE 1 +#else +# define ZEN_CONCRT_AVAILABLE 0 +#endif + +#if ZEN_WITH_COMPUTE_SERVICES + +namespace zen::compute { + +using namespace std::literals; + +////////////////////////////////////////////////////////////////////////// + +# if ZEN_PLATFORM_WINDOWS +# define ZEN_BUILD_ACTION L"Build.action" +# define ZEN_WORKER_UCB L"worker.ucb" +# else +# define ZEN_BUILD_ACTION "Build.action" +# define ZEN_WORKER_UCB "worker.ucb" +# endif + +////////////////////////////////////////////////////////////////////////// + +struct RecordingTreeVisitor : public FileSystemTraversal::TreeVisitor +{ + virtual void VisitFile(const std::filesystem::path& Parent, + const path_view& File, + uint64_t FileSize, + uint32_t NativeModeOrAttributes, + uint64_t NativeModificationTick) + { + ZEN_UNUSED(Parent, File, FileSize, NativeModeOrAttributes, NativeModificationTick); + + if (File.compare(path_view(ZEN_BUILD_ACTION)) == 0) + { + WorkDirs.push_back(Parent); + } + else if (File.compare(path_view(ZEN_WORKER_UCB)) == 0) + { + WorkerDirs.push_back(Parent); + } + } + + virtual bool VisitDirectory(const std::filesystem::path& Parent, const path_view& DirectoryName, uint32_t NativeModeOrAttributes) + { + ZEN_UNUSED(Parent, DirectoryName, NativeModeOrAttributes); + + return true; + } + + std::vector<std::filesystem::path> WorkerDirs; + std::vector<std::filesystem::path> WorkDirs; +}; + +////////////////////////////////////////////////////////////////////////// + +void +IterateOverArray(auto Array, auto Func, int TargetParallelism) +{ +# if ZEN_CONCRT_AVAILABLE + if (TargetParallelism > 1) + { + concurrency::simple_partitioner Chunker(Array.size() / TargetParallelism); + concurrency::parallel_for_each(begin(Array), end(Array), [&](const auto& Item) { Func(Item); }); + + return; + } +# else + ZEN_UNUSED(TargetParallelism); +# endif + + for (const auto& Item : Array) + { + Func(Item); + } +} + +////////////////////////////////////////////////////////////////////////// + +RecordingReaderBase::~RecordingReaderBase() = default; + +////////////////////////////////////////////////////////////////////////// + +RecordingReader::RecordingReader(const std::filesystem::path& RecordingPath) : m_RecordingLogDir(RecordingPath) +{ + CidStoreConfiguration CidConfig; + CidConfig.RootDirectory = m_RecordingLogDir / "cid"; + CidConfig.HugeValueThreshold = 128 * 1024 * 1024; + + m_CidStore.Initialize(CidConfig); +} + +RecordingReader::~RecordingReader() +{ + m_CidStore.Flush(); +} + +size_t +RecordingReader::GetActionCount() const +{ + return m_Actions.size(); +} + +IoBuffer +RecordingReader::FindChunkByCid(const IoHash& DecompressedId) +{ + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(DecompressedId)) + { + return Chunk; + } + + ZEN_ERROR("failed lookup of chunk with CID '{}'", DecompressedId); + + return {}; +} + +std::unordered_map<zen::IoHash, zen::CbPackage> +RecordingReader::ReadWorkers() +{ + std::unordered_map<zen::IoHash, zen::CbPackage> WorkerMap; + + { + CbObjectFromFile TocFile = LoadCompactBinaryObject(m_RecordingLogDir / "workers.ztoc"); + CbObject Toc = TocFile.Object; + + m_WorkerDataFile.Open(m_RecordingLogDir / "workers.zdat", BasicFile::Mode::kRead); + + ZEN_ASSERT(Toc["version"sv].AsInt32() == 1); + + for (auto& It : Toc["toc"]) + { + CbArrayView Entry = It.AsArrayView(); + CbFieldViewIterator Vit = Entry.CreateViewIterator(); + + const IoHash WorkerId = Vit++->AsHash(); + const uint64_t Offset = Vit++->AsInt64(0); + const uint64_t Size = Vit++->AsInt64(0); + + IoBuffer WorkerRange = m_WorkerDataFile.ReadRange(Offset, Size); + CbObject WorkerDesc = LoadCompactBinaryObject(WorkerRange); + CbPackage& WorkerPkg = WorkerMap[WorkerId]; + WorkerPkg.SetObject(WorkerDesc); + + WorkerDesc.IterateAttachments([&](const zen::CbFieldView AttachmentField) { + const IoHash AttachmentCid = AttachmentField.GetValue().AsHash(); + IoBuffer AttachmentData = m_CidStore.FindChunkByCid(AttachmentCid); + + if (AttachmentData) + { + IoHash RawHash; + uint64_t RawSize = 0; + CompressedBuffer CompressedData = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentData), RawHash, RawSize); + WorkerPkg.AddAttachment(CbAttachment(CompressedData, RawHash)); + } + }); + } + } + + // Scan actions as well (this should be called separately, ideally) + + ScanActions(); + + return WorkerMap; +} + +void +RecordingReader::ScanActions() +{ + CbObjectFromFile TocFile = LoadCompactBinaryObject(m_RecordingLogDir / "actions.ztoc"); + CbObject Toc = TocFile.Object; + + m_ActionDataFile.Open(m_RecordingLogDir / "actions.zdat", BasicFile::Mode::kRead); + + ZEN_ASSERT(Toc["version"sv].AsInt32() == 1); + + for (auto& It : Toc["toc"]) + { + CbArrayView ArrayEntry = It.AsArrayView(); + CbFieldViewIterator Vit = ArrayEntry.CreateViewIterator(); + + ActionEntry Entry; + Entry.ActionId = Vit++->AsHash(); + Entry.Offset = Vit++->AsInt64(0); + Entry.Size = Vit++->AsInt64(0); + + m_Actions.push_back(Entry); + } +} + +void +RecordingReader::IterateActions(std::function<void(CbObject ActionObject, const IoHash& ActionId)>&& Callback, int TargetParallelism) +{ + IterateOverArray( + m_Actions, + [&](const ActionEntry& Entry) { + CbObject ActionDesc = LoadCompactBinaryObject(m_ActionDataFile.ReadRange(Entry.Offset, Entry.Size)); + + Callback(ActionDesc, Entry.ActionId); + }, + TargetParallelism); +} + +////////////////////////////////////////////////////////////////////////// + +IoBuffer +LocalResolver::FindChunkByCid(const IoHash& DecompressedId) +{ + RwLock::SharedLockScope _(MapLock); + if (auto It = Attachments.find(DecompressedId); It != Attachments.end()) + { + return It->second; + } + + return {}; +} + +void +LocalResolver::Add(const IoHash& Cid, IoBuffer Data) +{ + RwLock::ExclusiveLockScope _(MapLock); + Data.SetContentType(ZenContentType::kCompressedBinary); + Attachments[Cid] = Data; +} + +/// + +UeRecordingReader::UeRecordingReader(const std::filesystem::path& RecordingPath) : m_RecordingDir(RecordingPath) +{ +} + +UeRecordingReader::~UeRecordingReader() +{ +} + +size_t +UeRecordingReader::GetActionCount() const +{ + return m_WorkDirs.size(); +} + +IoBuffer +UeRecordingReader::FindChunkByCid(const IoHash& DecompressedId) +{ + return m_LocalResolver.FindChunkByCid(DecompressedId); +} + +std::unordered_map<zen::IoHash, zen::CbPackage> +UeRecordingReader::ReadWorkers() +{ + std::unordered_map<zen::IoHash, zen::CbPackage> WorkerMap; + + FileSystemTraversal Traversal; + RecordingTreeVisitor Visitor; + Traversal.TraverseFileSystem(m_RecordingDir, Visitor); + + m_WorkDirs = std::move(Visitor.WorkDirs); + + for (const std::filesystem::path& WorkerDir : Visitor.WorkerDirs) + { + CbObjectFromFile WorkerFile = LoadCompactBinaryObject(WorkerDir / "worker.ucb"); + CbObject WorkerDesc = WorkerFile.Object; + const IoHash& WorkerId = WorkerFile.Hash; + CbPackage& WorkerPkg = WorkerMap[WorkerId]; + WorkerPkg.SetObject(WorkerDesc); + + WorkerDesc.IterateAttachments([&](const zen::CbFieldView AttachmentField) { + const IoHash AttachmentCid = AttachmentField.GetValue().AsHash(); + IoBuffer AttachmentData = ReadFile(WorkerDir / "chunks" / AttachmentCid.ToHexString()).Flatten(); + IoHash RawHash; + uint64_t RawSize = 0; + CompressedBuffer CompressedData = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentData), RawHash, RawSize); + WorkerPkg.AddAttachment(CbAttachment(CompressedData, RawHash)); + }); + } + + return WorkerMap; +} + +void +UeRecordingReader::IterateActions(std::function<void(CbObject ActionObject, const IoHash& ActionId)>&& Callback, int ParallelismTarget) +{ + IterateOverArray( + m_WorkDirs, + [&](const std::filesystem::path& WorkDir) { + CbPackage WorkPackage = ReadAction(WorkDir); + CbObject ActionObject = WorkPackage.GetObject(); + const IoHash& ActionId = WorkPackage.GetObjectHash(); + + Callback(ActionObject, ActionId); + }, + ParallelismTarget); +} + +CbPackage +UeRecordingReader::ReadAction(std::filesystem::path WorkDir) +{ + CbPackage WorkPackage; + std::filesystem::path WorkDescPath = WorkDir / "Build.action"; + CbObjectFromFile ActionFile = LoadCompactBinaryObject(WorkDescPath); + CbObject& ActionObject = ActionFile.Object; + + WorkPackage.SetObject(ActionObject); + + ActionObject.IterateAttachments([&](const zen::CbFieldView AttachmentField) { + const IoHash AttachmentCid = AttachmentField.GetValue().AsHash(); + IoBuffer AttachmentData = ReadFile(WorkDir / "inputs" / AttachmentCid.ToHexString()).Flatten(); + + m_LocalResolver.Add(AttachmentCid, AttachmentData); + + IoHash RawHash; + uint64_t RawSize = 0; + CompressedBuffer CompressedData = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentData), RawHash, RawSize); + ZEN_ASSERT(AttachmentCid == RawHash); + WorkPackage.AddAttachment(CbAttachment(CompressedData, RawHash)); + }); + + return WorkPackage; +} + +} // namespace zen::compute + +#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zencompute/remotehttprunner.cpp b/src/zencompute/remotehttprunner.cpp new file mode 100644 index 000000000..98ced5fe8 --- /dev/null +++ b/src/zencompute/remotehttprunner.cpp @@ -0,0 +1,457 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "remotehttprunner.h" + +#if ZEN_WITH_COMPUTE_SERVICES + +# include <zencore/compactbinary.h> +# include <zencore/compactbinarybuilder.h> +# include <zencore/compactbinarypackage.h> +# include <zencore/compress.h> +# include <zencore/except.h> +# include <zencore/filesystem.h> +# include <zencore/fmtutils.h> +# include <zencore/iobuffer.h> +# include <zencore/iohash.h> +# include <zencore/scopeguard.h> +# include <zenhttp/httpcommon.h> +# include <zenstore/cidstore.h> + +# include <span> + +////////////////////////////////////////////////////////////////////////// + +namespace zen::compute { + +using namespace std::literals; + +////////////////////////////////////////////////////////////////////////// + +RemoteHttpRunner::RemoteHttpRunner(ChunkResolver& InChunkResolver, const std::filesystem::path& BaseDir, std::string_view HostName) +: FunctionRunner(BaseDir) +, m_Log(logging::Get("http_exec")) +, m_ChunkResolver{InChunkResolver} +, m_BaseUrl{fmt::format("{}/apply", HostName)} +, m_Http(m_BaseUrl) +{ + m_MonitorThread = std::thread{&RemoteHttpRunner::MonitorThreadFunction, this}; +} + +RemoteHttpRunner::~RemoteHttpRunner() +{ + Shutdown(); +} + +void +RemoteHttpRunner::Shutdown() +{ + // TODO: should cleanly drain/cancel pending work + + m_MonitorThreadEnabled = false; + m_MonitorThreadEvent.Set(); + if (m_MonitorThread.joinable()) + { + m_MonitorThread.join(); + } +} + +void +RemoteHttpRunner::RegisterWorker(const CbPackage& WorkerPackage) +{ + const IoHash WorkerId = WorkerPackage.GetObjectHash(); + CbPackage WorkerDesc = WorkerPackage; + + std::string WorkerUrl = fmt::format("/workers/{}", WorkerId); + + HttpClient::Response WorkerResponse = m_Http.Get(WorkerUrl); + + if (WorkerResponse.StatusCode == HttpResponseCode::NotFound) + { + HttpClient::Response DescResponse = m_Http.Post(WorkerUrl, WorkerDesc.GetObject()); + + if (DescResponse.StatusCode == HttpResponseCode::NotFound) + { + CbPackage Pkg = WorkerDesc; + + // Build response package by sending only the attachments + // the other end needs. We start with the full package and + // remove the attachments which are not needed. + + { + std::unordered_set<IoHash> Needed; + + CbObject Response = DescResponse.AsObject(); + + for (auto& Item : Response["need"sv]) + { + const IoHash NeedHash = Item.AsHash(); + + Needed.insert(NeedHash); + } + + std::unordered_set<IoHash> ToRemove; + + for (const CbAttachment& Attachment : Pkg.GetAttachments()) + { + const IoHash& Hash = Attachment.GetHash(); + + if (Needed.find(Hash) == Needed.end()) + { + ToRemove.insert(Hash); + } + } + + for (const IoHash& Hash : ToRemove) + { + int RemovedCount = Pkg.RemoveAttachment(Hash); + + ZEN_ASSERT(RemovedCount == 1); + } + } + + // Post resulting package + + HttpClient::Response PayloadResponse = m_Http.Post(WorkerUrl, Pkg); + + if (!IsHttpSuccessCode(PayloadResponse.StatusCode)) + { + ZEN_ERROR("ERROR: unable to register payloads for worker {} at {}{}", WorkerId, m_Http.GetBaseUri(), WorkerUrl); + + // TODO: propagate error + } + } + else if (!IsHttpSuccessCode(DescResponse.StatusCode)) + { + ZEN_ERROR("ERROR: unable to register worker {} at {}{}", WorkerId, m_Http.GetBaseUri(), WorkerUrl); + + // TODO: propagate error + } + else + { + ZEN_ASSERT(DescResponse.StatusCode == HttpResponseCode::NoContent); + } + } + else if (WorkerResponse.StatusCode == HttpResponseCode::OK) + { + // Already known from a previous run + } + else if (!IsHttpSuccessCode(WorkerResponse.StatusCode)) + { + ZEN_ERROR("ERROR: unable to look up worker {} at {}{} (error: {} {})", + WorkerId, + m_Http.GetBaseUri(), + WorkerUrl, + (int)WorkerResponse.StatusCode, + ToString(WorkerResponse.StatusCode)); + + // TODO: propagate error + } +} + +size_t +RemoteHttpRunner::QueryCapacity() +{ + // Estimate how much more work we're ready to accept + + RwLock::SharedLockScope _{m_RunningLock}; + + size_t RunningCount = m_RemoteRunningMap.size(); + + if (RunningCount >= size_t(m_MaxRunningActions)) + { + return 0; + } + + return m_MaxRunningActions - RunningCount; +} + +std::vector<SubmitResult> +RemoteHttpRunner::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) +{ + std::vector<SubmitResult> Results; + + for (const Ref<RunnerAction>& Action : Actions) + { + Results.push_back(SubmitAction(Action)); + } + + return Results; +} + +SubmitResult +RemoteHttpRunner::SubmitAction(Ref<RunnerAction> Action) +{ + // Verify whether we can accept more work + + { + RwLock::SharedLockScope _{m_RunningLock}; + if (m_RemoteRunningMap.size() >= size_t(m_MaxRunningActions)) + { + return SubmitResult{.IsAccepted = false}; + } + } + + using namespace std::literals; + + // Each enqueued action is assigned an integer index (logical sequence number), + // which we use as a key for tracking data structures and as an opaque id which + // may be used by clients to reference the scheduled action + + const int32_t ActionLsn = Action->ActionLsn; + const CbObject& ActionObj = Action->ActionObj; + const IoHash ActionId = ActionObj.GetHash(); + + MaybeDumpAction(ActionLsn, ActionObj); + + // Enqueue job + + CbObject Result; + + HttpClient::Response WorkResponse = m_Http.Post("/jobs", ActionObj); + HttpResponseCode WorkResponseCode = WorkResponse.StatusCode; + + if (WorkResponseCode == HttpResponseCode::OK) + { + Result = WorkResponse.AsObject(); + } + else if (WorkResponseCode == HttpResponseCode::NotFound) + { + // Not all attachments are present + + // Build response package including all required attachments + + CbPackage Pkg; + Pkg.SetObject(ActionObj); + + CbObject Response = WorkResponse.AsObject(); + + for (auto& Item : Response["need"sv]) + { + const IoHash NeedHash = Item.AsHash(); + + if (IoBuffer Chunk = m_ChunkResolver.FindChunkByCid(NeedHash)) + { + uint64_t DataRawSize = 0; + IoHash DataRawHash; + CompressedBuffer Compressed = + CompressedBuffer::FromCompressed(SharedBuffer{Chunk}, /* out */ DataRawHash, /* out */ DataRawSize); + + ZEN_ASSERT(DataRawHash == NeedHash); + + Pkg.AddAttachment(CbAttachment(Compressed, NeedHash)); + } + else + { + // No such attachment + + return {.IsAccepted = false, .Reason = fmt::format("missing attachment {}", NeedHash)}; + } + } + + // Post resulting package + + HttpClient::Response PayloadResponse = m_Http.Post("/jobs", Pkg); + + if (!PayloadResponse) + { + ZEN_WARN("unable to register payloads for action {} at {}/jobs", ActionId, m_Http.GetBaseUri()); + + // TODO: include more information about the failure in the response + + return {.IsAccepted = false, .Reason = "HTTP request failed"}; + } + else if (PayloadResponse.StatusCode == HttpResponseCode::OK) + { + Result = PayloadResponse.AsObject(); + } + else + { + // Unexpected response + + const int ResponseStatusCode = (int)PayloadResponse.StatusCode; + + ZEN_WARN("unable to register payloads for action {} at {}/jobs (error: {} {})", + ActionId, + m_Http.GetBaseUri(), + ResponseStatusCode, + ToString(ResponseStatusCode)); + + return {.IsAccepted = false, + .Reason = fmt::format("unexpected response code {} {} from {}/jobs", + ResponseStatusCode, + ToString(ResponseStatusCode), + m_Http.GetBaseUri())}; + } + } + + if (Result) + { + if (const int32_t LsnField = Result["lsn"].AsInt32(0)) + { + HttpRunningAction NewAction; + NewAction.Action = Action; + NewAction.RemoteActionLsn = LsnField; + + { + RwLock::ExclusiveLockScope _(m_RunningLock); + + m_RemoteRunningMap[LsnField] = std::move(NewAction); + } + + ZEN_DEBUG("scheduled action {} with remote LSN {} (local LSN {})", ActionId, LsnField, ActionLsn); + + Action->SetActionState(RunnerAction::State::Running); + + return SubmitResult{.IsAccepted = true}; + } + } + + return {}; +} + +bool +RemoteHttpRunner::IsHealthy() +{ + if (HttpClient::Response Ready = m_Http.Get("/ready")) + { + return true; + } + else + { + // TODO: use response to propagate context + return false; + } +} + +size_t +RemoteHttpRunner::GetSubmittedActionCount() +{ + RwLock::SharedLockScope _(m_RunningLock); + return m_RemoteRunningMap.size(); +} + +void +RemoteHttpRunner::MonitorThreadFunction() +{ + SetCurrentThreadName("RemoteHttpRunner_Monitor"); + + do + { + const int NormalWaitingTime = 1000; + int WaitTimeMs = NormalWaitingTime; + auto WaitOnce = [&] { return m_MonitorThreadEvent.Wait(WaitTimeMs); }; + auto SweepOnce = [&] { + const size_t RetiredCount = SweepRunningActions(); + + m_RunningLock.WithSharedLock([&] { + if (m_RemoteRunningMap.size() > 16) + { + WaitTimeMs = NormalWaitingTime / 4; + } + else + { + if (RetiredCount) + { + WaitTimeMs = NormalWaitingTime / 2; + } + else + { + WaitTimeMs = NormalWaitingTime; + } + } + }); + }; + + while (!WaitOnce()) + { + SweepOnce(); + } + + // Signal received - this may mean we should quit + + SweepOnce(); + } while (m_MonitorThreadEnabled); +} + +size_t +RemoteHttpRunner::SweepRunningActions() +{ + std::vector<HttpRunningAction> CompletedActions; + + // Poll remote for list of completed actions + + HttpClient::Response ResponseCompleted = m_Http.Get("/jobs/completed"sv); + + if (CbObject Completed = ResponseCompleted.AsObject()) + { + for (auto& FieldIt : Completed["completed"sv]) + { + const int32_t CompleteLsn = FieldIt.AsInt32(); + + if (HttpClient::Response ResponseJob = m_Http.Get(fmt::format("/jobs/{}"sv, CompleteLsn))) + { + m_RunningLock.WithExclusiveLock([&] { + if (auto CompleteIt = m_RemoteRunningMap.find(CompleteLsn); CompleteIt != m_RemoteRunningMap.end()) + { + HttpRunningAction CompletedAction = std::move(CompleteIt->second); + CompletedAction.ActionResults = ResponseJob.AsPackage(); + CompletedAction.Success = true; + + CompletedActions.push_back(std::move(CompletedAction)); + m_RemoteRunningMap.erase(CompleteIt); + } + else + { + // we received a completion notice for an action we don't know about, + // this can happen if the runner is used by multiple upstream schedulers, + // or if this compute node was recently restarted and lost track of + // previously scheduled actions + } + }); + } + } + + if (CbObjectView Metrics = Completed["metrics"sv].AsObjectView()) + { + // if (const size_t CpuCount = Metrics["core_count"].AsInt32(0)) + if (const int32_t CpuCount = Metrics["lp_count"].AsInt32(0)) + { + const int32_t NewCap = zen::Max(4, CpuCount); + + if (m_MaxRunningActions > NewCap) + { + ZEN_DEBUG("capping {} to {} actions (was {})", m_BaseUrl, NewCap, m_MaxRunningActions); + + m_MaxRunningActions = NewCap; + } + } + } + } + + // Notify outer. Note that this has to be done without holding any local locks + // otherwise we may end up with deadlocks. + + for (HttpRunningAction& HttpAction : CompletedActions) + { + const int ActionLsn = HttpAction.Action->ActionLsn; + + if (HttpAction.Success) + { + ZEN_DEBUG("completed: {} LSN {} (remote LSN {})", HttpAction.Action->ActionId, ActionLsn, HttpAction.RemoteActionLsn); + + HttpAction.Action->SetActionState(RunnerAction::State::Completed); + + HttpAction.Action->SetResult(std::move(HttpAction.ActionResults)); + } + else + { + HttpAction.Action->SetActionState(RunnerAction::State::Failed); + } + } + + return CompletedActions.size(); +} + +} // namespace zen::compute + +#endif diff --git a/src/zencompute/remotehttprunner.h b/src/zencompute/remotehttprunner.h new file mode 100644 index 000000000..1e885da3d --- /dev/null +++ b/src/zencompute/remotehttprunner.h @@ -0,0 +1,80 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "zencompute/functionservice.h" + +#if ZEN_WITH_COMPUTE_SERVICES + +# include "functionrunner.h" + +# include <zencore/compactbinarypackage.h> +# include <zencore/logging.h> +# include <zencore/zencore.h> +# include <zenhttp/httpclient.h> + +# include <atomic> +# include <filesystem> +# include <thread> + +namespace zen { +class CidStore; +} + +namespace zen::compute { + +/** HTTP-based runner + + This implements a DDC remote compute execution strategy via REST API + + */ + +class RemoteHttpRunner : public FunctionRunner +{ + RemoteHttpRunner(RemoteHttpRunner&&) = delete; + RemoteHttpRunner& operator=(RemoteHttpRunner&&) = delete; + +public: + RemoteHttpRunner(ChunkResolver& InChunkResolver, const std::filesystem::path& BaseDir, std::string_view HostName); + ~RemoteHttpRunner(); + + virtual void Shutdown() override; + virtual void RegisterWorker(const CbPackage& WorkerPackage) override; + [[nodiscard]] virtual SubmitResult SubmitAction(Ref<RunnerAction> Action) override; + [[nodiscard]] virtual bool IsHealthy() override; + [[nodiscard]] virtual size_t GetSubmittedActionCount() override; + [[nodiscard]] virtual size_t QueryCapacity() override; + [[nodiscard]] virtual std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) override; + +protected: + LoggerRef Log() { return m_Log; } + +private: + LoggerRef m_Log; + ChunkResolver& m_ChunkResolver; + std::string m_BaseUrl; + HttpClient m_Http; + + int32_t m_MaxRunningActions = 256; // arbitrary limit for testing + + struct HttpRunningAction + { + Ref<RunnerAction> Action; + int RemoteActionLsn = 0; // Remote LSN + bool Success = false; + CbPackage ActionResults; + }; + + RwLock m_RunningLock; + std::unordered_map<int, HttpRunningAction> m_RemoteRunningMap; // Note that this is keyed on the *REMOTE* lsn + + std::thread m_MonitorThread; + std::atomic<bool> m_MonitorThreadEnabled{true}; + Event m_MonitorThreadEvent; + void MonitorThreadFunction(); + size_t SweepRunningActions(); +}; + +} // namespace zen::compute + +#endif diff --git a/src/zencompute/xmake.lua b/src/zencompute/xmake.lua new file mode 100644 index 000000000..c710b662d --- /dev/null +++ b/src/zencompute/xmake.lua @@ -0,0 +1,11 @@ +-- Copyright Epic Games, Inc. All Rights Reserved. + +target('zencompute') + set_kind("static") + set_group("libs") + add_headerfiles("**.h") + add_files("**.cpp") + add_includedirs("include", {public=true}) + add_deps("zencore", "zenstore", "zenutil", "zennet", "zenhttp") + add_packages("vcpkg::gsl-lite") + add_packages("vcpkg::spdlog", "vcpkg::cxxopts") diff --git a/src/zencompute/zencompute.cpp b/src/zencompute/zencompute.cpp new file mode 100644 index 000000000..633250f4e --- /dev/null +++ b/src/zencompute/zencompute.cpp @@ -0,0 +1,12 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "zencompute/zencompute.h" + +namespace zen { + +void +zencompute_forcelinktests() +{ +} + +} // namespace zen |