diff options
Diffstat (limited to 'src/zencompute/runners/remotehttprunner.cpp')
| -rw-r--r-- | src/zencompute/runners/remotehttprunner.cpp | 618 |
1 files changed, 618 insertions, 0 deletions
diff --git a/src/zencompute/runners/remotehttprunner.cpp b/src/zencompute/runners/remotehttprunner.cpp new file mode 100644 index 000000000..672636d06 --- /dev/null +++ b/src/zencompute/runners/remotehttprunner.cpp @@ -0,0 +1,618 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "remotehttprunner.h" + +#if ZEN_WITH_COMPUTE_SERVICES + +# include <zencore/compactbinary.h> +# include <zencore/compactbinarybuilder.h> +# include <zencore/compactbinarypackage.h> +# include <zencore/compress.h> +# include <zencore/except.h> +# include <zencore/filesystem.h> +# include <zencore/fmtutils.h> +# include <zencore/iobuffer.h> +# include <zencore/iohash.h> +# include <zencore/scopeguard.h> +# include <zencore/system.h> +# include <zencore/trace.h> +# include <zenhttp/httpcommon.h> +# include <zenstore/cidstore.h> + +# include <span> + +////////////////////////////////////////////////////////////////////////// + +namespace zen::compute { + +using namespace std::literals; + +////////////////////////////////////////////////////////////////////////// + +RemoteHttpRunner::RemoteHttpRunner(ChunkResolver& InChunkResolver, + const std::filesystem::path& BaseDir, + std::string_view HostName, + WorkerThreadPool& InWorkerPool) +: FunctionRunner(BaseDir) +, m_Log(logging::Get("http_exec")) +, m_ChunkResolver{InChunkResolver} +, m_WorkerPool{InWorkerPool} +, m_HostName{HostName} +, m_BaseUrl{fmt::format("{}/compute", HostName)} +, m_Http(m_BaseUrl) +, m_InstanceId(Oid::NewOid()) +{ + m_MonitorThread = std::thread{&RemoteHttpRunner::MonitorThreadFunction, this}; +} + +RemoteHttpRunner::~RemoteHttpRunner() +{ + Shutdown(); +} + +void +RemoteHttpRunner::Shutdown() +{ + // TODO: should cleanly drain/cancel pending work + + m_MonitorThreadEnabled = false; + m_MonitorThreadEvent.Set(); + if (m_MonitorThread.joinable()) + { + m_MonitorThread.join(); + } +} + +void +RemoteHttpRunner::RegisterWorker(const CbPackage& WorkerPackage) +{ + ZEN_TRACE_CPU("RemoteHttpRunner::RegisterWorker"); + const IoHash WorkerId = WorkerPackage.GetObjectHash(); + CbPackage WorkerDesc = WorkerPackage; + + std::string WorkerUrl = fmt::format("/workers/{}", WorkerId); + + HttpClient::Response WorkerResponse = m_Http.Get(WorkerUrl); + + if (WorkerResponse.StatusCode == HttpResponseCode::NotFound) + { + HttpClient::Response DescResponse = m_Http.Post(WorkerUrl, WorkerDesc.GetObject()); + + if (DescResponse.StatusCode == HttpResponseCode::NotFound) + { + CbPackage Pkg = WorkerDesc; + + // Build response package by sending only the attachments + // the other end needs. We start with the full package and + // remove the attachments which are not needed. + + { + std::unordered_set<IoHash> Needed; + + CbObject Response = DescResponse.AsObject(); + + for (auto& Item : Response["need"sv]) + { + const IoHash NeedHash = Item.AsHash(); + + Needed.insert(NeedHash); + } + + std::unordered_set<IoHash> ToRemove; + + for (const CbAttachment& Attachment : Pkg.GetAttachments()) + { + const IoHash& Hash = Attachment.GetHash(); + + if (Needed.find(Hash) == Needed.end()) + { + ToRemove.insert(Hash); + } + } + + for (const IoHash& Hash : ToRemove) + { + int RemovedCount = Pkg.RemoveAttachment(Hash); + + ZEN_ASSERT(RemovedCount == 1); + } + } + + // Post resulting package + + HttpClient::Response PayloadResponse = m_Http.Post(WorkerUrl, Pkg); + + if (!IsHttpSuccessCode(PayloadResponse.StatusCode)) + { + ZEN_ERROR("ERROR: unable to register payloads for worker {} at {}{}", WorkerId, m_Http.GetBaseUri(), WorkerUrl); + + // TODO: propagate error + } + } + else if (!IsHttpSuccessCode(DescResponse.StatusCode)) + { + ZEN_ERROR("ERROR: unable to register worker {} at {}{}", WorkerId, m_Http.GetBaseUri(), WorkerUrl); + + // TODO: propagate error + } + else + { + ZEN_ASSERT(DescResponse.StatusCode == HttpResponseCode::NoContent); + } + } + else if (WorkerResponse.StatusCode == HttpResponseCode::OK) + { + // Already known from a previous run + } + else if (!IsHttpSuccessCode(WorkerResponse.StatusCode)) + { + ZEN_ERROR("ERROR: unable to look up worker {} at {}{} (error: {} {})", + WorkerId, + m_Http.GetBaseUri(), + WorkerUrl, + (int)WorkerResponse.StatusCode, + ToString(WorkerResponse.StatusCode)); + + // TODO: propagate error + } +} + +size_t +RemoteHttpRunner::QueryCapacity() +{ + // Estimate how much more work we're ready to accept + + RwLock::SharedLockScope _{m_RunningLock}; + + size_t RunningCount = m_RemoteRunningMap.size(); + + if (RunningCount >= size_t(m_MaxRunningActions)) + { + return 0; + } + + return m_MaxRunningActions - RunningCount; +} + +std::vector<SubmitResult> +RemoteHttpRunner::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) +{ + ZEN_TRACE_CPU("RemoteHttpRunner::SubmitActions"); + + if (Actions.size() <= 1) + { + std::vector<SubmitResult> Results; + + for (const Ref<RunnerAction>& Action : Actions) + { + Results.push_back(SubmitAction(Action)); + } + + return Results; + } + + // For larger batches, submit HTTP requests in parallel via the shared worker pool + + std::vector<std::future<SubmitResult>> Futures; + Futures.reserve(Actions.size()); + + for (const Ref<RunnerAction>& Action : Actions) + { + std::packaged_task<SubmitResult()> Task([this, Action]() { return SubmitAction(Action); }); + + Futures.push_back(m_WorkerPool.EnqueueTask(std::move(Task), WorkerThreadPool::EMode::EnableBacklog)); + } + + std::vector<SubmitResult> Results; + Results.reserve(Futures.size()); + + for (auto& Future : Futures) + { + Results.push_back(Future.get()); + } + + return Results; +} + +SubmitResult +RemoteHttpRunner::SubmitAction(Ref<RunnerAction> Action) +{ + ZEN_TRACE_CPU("RemoteHttpRunner::SubmitAction"); + + // Verify whether we can accept more work + + { + RwLock::SharedLockScope _{m_RunningLock}; + if (m_RemoteRunningMap.size() >= size_t(m_MaxRunningActions)) + { + return SubmitResult{.IsAccepted = false}; + } + } + + using namespace std::literals; + + // Each enqueued action is assigned an integer index (logical sequence number), + // which we use as a key for tracking data structures and as an opaque id which + // may be used by clients to reference the scheduled action + + Action->ExecutionLocation = m_HostName; + + const int32_t ActionLsn = Action->ActionLsn; + const CbObject& ActionObj = Action->ActionObj; + const IoHash ActionId = ActionObj.GetHash(); + + MaybeDumpAction(ActionLsn, ActionObj); + + // Determine the submission URL. If the action belongs to a queue, ensure a + // corresponding remote queue exists on the target node and submit via it. + + std::string SubmitUrl = "/jobs"; + if (const int QueueId = Action->QueueId; QueueId != 0) + { + CbObject QueueMeta = Action->GetOwnerSession()->GetQueueMetadata(QueueId); + CbObject QueueConfig = Action->GetOwnerSession()->GetQueueConfig(QueueId); + if (Oid Token = EnsureRemoteQueue(QueueId, QueueMeta, QueueConfig); Token != Oid::Zero) + { + SubmitUrl = fmt::format("/queues/{}/jobs", Token); + } + } + + // Enqueue job. If the remote returns FailedDependency (424), it means it + // cannot resolve the worker/function — re-register the worker and retry once. + + CbObject Result; + HttpClient::Response WorkResponse; + HttpResponseCode WorkResponseCode{}; + + for (int Attempt = 0; Attempt < 2; ++Attempt) + { + WorkResponse = m_Http.Post(SubmitUrl, ActionObj); + WorkResponseCode = WorkResponse.StatusCode; + + if (WorkResponseCode == HttpResponseCode::FailedDependency && Attempt == 0) + { + ZEN_WARN("remote {} returned FailedDependency for action {} — re-registering worker and retrying", + m_Http.GetBaseUri(), + ActionId); + + RegisterWorker(Action->Worker.Descriptor); + } + else + { + break; + } + } + + if (WorkResponseCode == HttpResponseCode::OK) + { + Result = WorkResponse.AsObject(); + } + else if (WorkResponseCode == HttpResponseCode::NotFound) + { + // Not all attachments are present + + // Build response package including all required attachments + + CbPackage Pkg; + Pkg.SetObject(ActionObj); + + CbObject Response = WorkResponse.AsObject(); + + for (auto& Item : Response["need"sv]) + { + const IoHash NeedHash = Item.AsHash(); + + if (IoBuffer Chunk = m_ChunkResolver.FindChunkByCid(NeedHash)) + { + uint64_t DataRawSize = 0; + IoHash DataRawHash; + CompressedBuffer Compressed = + CompressedBuffer::FromCompressed(SharedBuffer{Chunk}, /* out */ DataRawHash, /* out */ DataRawSize); + + ZEN_ASSERT(DataRawHash == NeedHash); + + Pkg.AddAttachment(CbAttachment(Compressed, NeedHash)); + } + else + { + // No such attachment + + return {.IsAccepted = false, .Reason = fmt::format("missing attachment {}", NeedHash)}; + } + } + + // Post resulting package + + HttpClient::Response PayloadResponse = m_Http.Post(SubmitUrl, Pkg); + + if (!PayloadResponse) + { + ZEN_WARN("unable to register payloads for action {} at {}{}", ActionId, m_Http.GetBaseUri(), SubmitUrl); + + // TODO: include more information about the failure in the response + + return {.IsAccepted = false, .Reason = "HTTP request failed"}; + } + else if (PayloadResponse.StatusCode == HttpResponseCode::OK) + { + Result = PayloadResponse.AsObject(); + } + else + { + // Unexpected response + + const int ResponseStatusCode = (int)PayloadResponse.StatusCode; + + ZEN_WARN("unable to register payloads for action {} at {}{} (error: {} {})", + ActionId, + m_Http.GetBaseUri(), + SubmitUrl, + ResponseStatusCode, + ToString(ResponseStatusCode)); + + return {.IsAccepted = false, + .Reason = fmt::format("unexpected response code {} {} from {}{}", + ResponseStatusCode, + ToString(ResponseStatusCode), + m_Http.GetBaseUri(), + SubmitUrl)}; + } + } + + if (Result) + { + if (const int32_t LsnField = Result["lsn"].AsInt32(0)) + { + HttpRunningAction NewAction; + NewAction.Action = Action; + NewAction.RemoteActionLsn = LsnField; + + { + RwLock::ExclusiveLockScope _(m_RunningLock); + + m_RemoteRunningMap[LsnField] = std::move(NewAction); + } + + ZEN_DEBUG("scheduled action {} with remote LSN {} (local LSN {})", ActionId, LsnField, ActionLsn); + + Action->SetActionState(RunnerAction::State::Running); + + return SubmitResult{.IsAccepted = true}; + } + } + + return {}; +} + +Oid +RemoteHttpRunner::EnsureRemoteQueue(int QueueId, const CbObject& Metadata, const CbObject& Config) +{ + { + RwLock::SharedLockScope _(m_QueueTokenLock); + if (auto It = m_RemoteQueueTokens.find(QueueId); It != m_RemoteQueueTokens.end()) + { + return It->second; + } + } + + // Build a stable idempotency key that uniquely identifies this (runner instance, local queue) + // pair. The server uses this to return the same remote queue token for concurrent or redundant + // requests, preventing orphaned remote queues when multiple threads race through here. + // Also send hostname so the server can associate the queue with its origin for diagnostics. + CbObjectWriter Body; + Body << "idempotency_key"sv << fmt::format("{}/{}", m_InstanceId, QueueId); + Body << "hostname"sv << GetMachineName(); + if (Metadata) + { + Body << "metadata"sv << Metadata; + } + if (Config) + { + Body << "config"sv << Config; + } + + HttpClient::Response Resp = m_Http.Post("/queues/remote", Body.Save()); + if (!Resp) + { + ZEN_WARN("failed to create remote queue for local queue {} on {}", QueueId, m_HostName); + return Oid::Zero; + } + + Oid Token = Oid::TryFromHexString(Resp.AsObject()["queue_token"sv].AsString()); + if (Token == Oid::Zero) + { + return Oid::Zero; + } + + ZEN_DEBUG("created remote queue '{}' for local queue {} on {}", Token, QueueId, m_HostName); + + RwLock::ExclusiveLockScope _(m_QueueTokenLock); + auto [It, Inserted] = m_RemoteQueueTokens.try_emplace(QueueId, Token); + return It->second; +} + +void +RemoteHttpRunner::CancelRemoteQueue(int QueueId) +{ + Oid Token; + { + RwLock::SharedLockScope _(m_QueueTokenLock); + if (auto It = m_RemoteQueueTokens.find(QueueId); It != m_RemoteQueueTokens.end()) + { + Token = It->second; + } + } + + if (Token == Oid::Zero) + { + return; + } + + HttpClient::Response Resp = m_Http.Delete(fmt::format("/queues/{}", Token)); + + if (Resp.StatusCode == HttpResponseCode::NoContent) + { + ZEN_DEBUG("cancelled remote queue '{}' (local queue {}) on {}", Token, QueueId, m_HostName); + } + else + { + ZEN_WARN("failed to cancel remote queue '{}' on {}: {}", Token, m_HostName, int(Resp.StatusCode)); + } +} + +bool +RemoteHttpRunner::IsHealthy() +{ + if (HttpClient::Response Ready = m_Http.Get("/ready")) + { + return true; + } + else + { + // TODO: use response to propagate context + return false; + } +} + +size_t +RemoteHttpRunner::GetSubmittedActionCount() +{ + RwLock::SharedLockScope _(m_RunningLock); + return m_RemoteRunningMap.size(); +} + +void +RemoteHttpRunner::MonitorThreadFunction() +{ + SetCurrentThreadName("RemoteHttpRunner_Monitor"); + + do + { + const int NormalWaitingTime = 200; + int WaitTimeMs = NormalWaitingTime; + auto WaitOnce = [&] { return m_MonitorThreadEvent.Wait(WaitTimeMs); }; + auto SweepOnce = [&] { + const size_t RetiredCount = SweepRunningActions(); + + m_RunningLock.WithSharedLock([&] { + if (m_RemoteRunningMap.size() > 16) + { + WaitTimeMs = NormalWaitingTime / 4; + } + else + { + if (RetiredCount) + { + WaitTimeMs = NormalWaitingTime / 2; + } + else + { + WaitTimeMs = NormalWaitingTime; + } + } + }); + }; + + while (!WaitOnce()) + { + SweepOnce(); + } + + // Signal received - this may mean we should quit + + SweepOnce(); + } while (m_MonitorThreadEnabled); +} + +size_t +RemoteHttpRunner::SweepRunningActions() +{ + ZEN_TRACE_CPU("RemoteHttpRunner::SweepRunningActions"); + std::vector<HttpRunningAction> CompletedActions; + + // Poll remote for list of completed actions + + HttpClient::Response ResponseCompleted = m_Http.Get("/jobs/completed"sv); + + if (CbObject Completed = ResponseCompleted.AsObject()) + { + for (auto& FieldIt : Completed["completed"sv]) + { + CbObjectView EntryObj = FieldIt.AsObjectView(); + const int32_t CompleteLsn = EntryObj["lsn"sv].AsInt32(); + std::string_view StateName = EntryObj["state"sv].AsString(); + + RunnerAction::State RemoteState = RunnerAction::FromString(StateName); + + // Always fetch to drain the result from the remote's results map, + // but only keep the result package for successfully completed actions. + HttpClient::Response ResponseJob = m_Http.Get(fmt::format("/jobs/{}"sv, CompleteLsn)); + + m_RunningLock.WithExclusiveLock([&] { + if (auto CompleteIt = m_RemoteRunningMap.find(CompleteLsn); CompleteIt != m_RemoteRunningMap.end()) + { + HttpRunningAction CompletedAction = std::move(CompleteIt->second); + CompletedAction.RemoteState = RemoteState; + + if (RemoteState == RunnerAction::State::Completed && ResponseJob) + { + CompletedAction.ActionResults = ResponseJob.AsPackage(); + } + + CompletedActions.push_back(std::move(CompletedAction)); + m_RemoteRunningMap.erase(CompleteIt); + } + else + { + // we received a completion notice for an action we don't know about, + // this can happen if the runner is used by multiple upstream schedulers, + // or if this compute node was recently restarted and lost track of + // previously scheduled actions + } + }); + } + + if (CbObjectView Metrics = Completed["metrics"sv].AsObjectView()) + { + // if (const size_t CpuCount = Metrics["core_count"].AsInt32(0)) + if (const int32_t CpuCount = Metrics["lp_count"].AsInt32(0)) + { + const int32_t NewCap = zen::Max(4, CpuCount); + + if (m_MaxRunningActions > NewCap) + { + ZEN_DEBUG("capping {} to {} actions (was {})", m_BaseUrl, NewCap, m_MaxRunningActions); + + m_MaxRunningActions = NewCap; + } + } + } + } + + // Notify outer. Note that this has to be done without holding any local locks + // otherwise we may end up with deadlocks. + + for (HttpRunningAction& HttpAction : CompletedActions) + { + const int ActionLsn = HttpAction.Action->ActionLsn; + + ZEN_DEBUG("action {} LSN {} (remote LSN {}) -> {}", + HttpAction.Action->ActionId, + ActionLsn, + HttpAction.RemoteActionLsn, + RunnerAction::ToString(HttpAction.RemoteState)); + + if (HttpAction.RemoteState == RunnerAction::State::Completed) + { + HttpAction.Action->SetResult(std::move(HttpAction.ActionResults)); + } + + HttpAction.Action->SetActionState(HttpAction.RemoteState); + } + + return CompletedActions.size(); +} + +} // namespace zen::compute + +#endif |