// Copyright Epic Games, Inc. All Rights Reserved. #include "remotehttprunner.h" #if ZEN_WITH_COMPUTE_SERVICES # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include ////////////////////////////////////////////////////////////////////////// namespace zen::compute { using namespace std::literals; ////////////////////////////////////////////////////////////////////////// RemoteHttpRunner::RemoteHttpRunner(ChunkResolver& InChunkResolver, const std::filesystem::path& BaseDir, std::string_view HostName, WorkerThreadPool& InWorkerPool) : FunctionRunner(BaseDir) , m_Log(logging::Get("http_exec")) , m_ChunkResolver{InChunkResolver} , m_WorkerPool{InWorkerPool} , m_HostName{HostName} , m_BaseUrl{fmt::format("{}/compute", HostName)} , m_Http(m_BaseUrl) , m_InstanceId(Oid::NewOid()) { m_MonitorThread = std::thread{&RemoteHttpRunner::MonitorThreadFunction, this}; } RemoteHttpRunner::~RemoteHttpRunner() { Shutdown(); } void RemoteHttpRunner::Shutdown() { // TODO: should cleanly drain/cancel pending work m_MonitorThreadEnabled = false; m_MonitorThreadEvent.Set(); if (m_MonitorThread.joinable()) { m_MonitorThread.join(); } } void RemoteHttpRunner::RegisterWorker(const CbPackage& WorkerPackage) { ZEN_TRACE_CPU("RemoteHttpRunner::RegisterWorker"); const IoHash WorkerId = WorkerPackage.GetObjectHash(); CbPackage WorkerDesc = WorkerPackage; std::string WorkerUrl = fmt::format("/workers/{}", WorkerId); HttpClient::Response WorkerResponse = m_Http.Get(WorkerUrl); if (WorkerResponse.StatusCode == HttpResponseCode::NotFound) { HttpClient::Response DescResponse = m_Http.Post(WorkerUrl, WorkerDesc.GetObject()); if (DescResponse.StatusCode == HttpResponseCode::NotFound) { CbPackage Pkg = WorkerDesc; // Build response package by sending only the attachments // the other end needs. We start with the full package and // remove the attachments which are not needed. { std::unordered_set Needed; CbObject Response = DescResponse.AsObject(); for (auto& Item : Response["need"sv]) { const IoHash NeedHash = Item.AsHash(); Needed.insert(NeedHash); } std::unordered_set ToRemove; for (const CbAttachment& Attachment : Pkg.GetAttachments()) { const IoHash& Hash = Attachment.GetHash(); if (Needed.find(Hash) == Needed.end()) { ToRemove.insert(Hash); } } for (const IoHash& Hash : ToRemove) { int RemovedCount = Pkg.RemoveAttachment(Hash); ZEN_ASSERT(RemovedCount == 1); } } // Post resulting package HttpClient::Response PayloadResponse = m_Http.Post(WorkerUrl, Pkg); if (!IsHttpSuccessCode(PayloadResponse.StatusCode)) { ZEN_ERROR("ERROR: unable to register payloads for worker {} at {}{}", WorkerId, m_Http.GetBaseUri(), WorkerUrl); // TODO: propagate error } } else if (!IsHttpSuccessCode(DescResponse.StatusCode)) { ZEN_ERROR("ERROR: unable to register worker {} at {}{}", WorkerId, m_Http.GetBaseUri(), WorkerUrl); // TODO: propagate error } else { ZEN_ASSERT(DescResponse.StatusCode == HttpResponseCode::NoContent); } } else if (WorkerResponse.StatusCode == HttpResponseCode::OK) { // Already known from a previous run } else if (!IsHttpSuccessCode(WorkerResponse.StatusCode)) { ZEN_ERROR("ERROR: unable to look up worker {} at {}{} (error: {} {})", WorkerId, m_Http.GetBaseUri(), WorkerUrl, (int)WorkerResponse.StatusCode, ToString(WorkerResponse.StatusCode)); // TODO: propagate error } } size_t RemoteHttpRunner::QueryCapacity() { // Estimate how much more work we're ready to accept RwLock::SharedLockScope _{m_RunningLock}; size_t RunningCount = m_RemoteRunningMap.size(); if (RunningCount >= size_t(m_MaxRunningActions)) { return 0; } return m_MaxRunningActions - RunningCount; } std::vector RemoteHttpRunner::SubmitActions(const std::vector>& Actions) { ZEN_TRACE_CPU("RemoteHttpRunner::SubmitActions"); if (Actions.size() <= 1) { std::vector Results; for (const Ref& Action : Actions) { Results.push_back(SubmitAction(Action)); } return Results; } // For larger batches, submit HTTP requests in parallel via the shared worker pool std::vector> Futures; Futures.reserve(Actions.size()); for (const Ref& Action : Actions) { std::packaged_task Task([this, Action]() { return SubmitAction(Action); }); Futures.push_back(m_WorkerPool.EnqueueTask(std::move(Task), WorkerThreadPool::EMode::EnableBacklog)); } std::vector Results; Results.reserve(Futures.size()); for (auto& Future : Futures) { Results.push_back(Future.get()); } return Results; } SubmitResult RemoteHttpRunner::SubmitAction(Ref Action) { ZEN_TRACE_CPU("RemoteHttpRunner::SubmitAction"); // Verify whether we can accept more work { RwLock::SharedLockScope _{m_RunningLock}; if (m_RemoteRunningMap.size() >= size_t(m_MaxRunningActions)) { return SubmitResult{.IsAccepted = false}; } } using namespace std::literals; // Each enqueued action is assigned an integer index (logical sequence number), // which we use as a key for tracking data structures and as an opaque id which // may be used by clients to reference the scheduled action Action->ExecutionLocation = m_HostName; const int32_t ActionLsn = Action->ActionLsn; const CbObject& ActionObj = Action->ActionObj; const IoHash ActionId = ActionObj.GetHash(); MaybeDumpAction(ActionLsn, ActionObj); // Determine the submission URL. If the action belongs to a queue, ensure a // corresponding remote queue exists on the target node and submit via it. std::string SubmitUrl = "/jobs"; if (const int QueueId = Action->QueueId; QueueId != 0) { CbObject QueueMeta = Action->GetOwnerSession()->GetQueueMetadata(QueueId); CbObject QueueConfig = Action->GetOwnerSession()->GetQueueConfig(QueueId); if (Oid Token = EnsureRemoteQueue(QueueId, QueueMeta, QueueConfig); Token != Oid::Zero) { SubmitUrl = fmt::format("/queues/{}/jobs", Token); } } // Enqueue job. If the remote returns FailedDependency (424), it means it // cannot resolve the worker/function — re-register the worker and retry once. CbObject Result; HttpClient::Response WorkResponse; HttpResponseCode WorkResponseCode{}; for (int Attempt = 0; Attempt < 2; ++Attempt) { WorkResponse = m_Http.Post(SubmitUrl, ActionObj); WorkResponseCode = WorkResponse.StatusCode; if (WorkResponseCode == HttpResponseCode::FailedDependency && Attempt == 0) { ZEN_WARN("remote {} returned FailedDependency for action {} — re-registering worker and retrying", m_Http.GetBaseUri(), ActionId); RegisterWorker(Action->Worker.Descriptor); } else { break; } } if (WorkResponseCode == HttpResponseCode::OK) { Result = WorkResponse.AsObject(); } else if (WorkResponseCode == HttpResponseCode::NotFound) { // Not all attachments are present // Build response package including all required attachments CbPackage Pkg; Pkg.SetObject(ActionObj); CbObject Response = WorkResponse.AsObject(); for (auto& Item : Response["need"sv]) { const IoHash NeedHash = Item.AsHash(); if (IoBuffer Chunk = m_ChunkResolver.FindChunkByCid(NeedHash)) { uint64_t DataRawSize = 0; IoHash DataRawHash; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer{Chunk}, /* out */ DataRawHash, /* out */ DataRawSize); ZEN_ASSERT(DataRawHash == NeedHash); Pkg.AddAttachment(CbAttachment(Compressed, NeedHash)); } else { // No such attachment return {.IsAccepted = false, .Reason = fmt::format("missing attachment {}", NeedHash)}; } } // Post resulting package HttpClient::Response PayloadResponse = m_Http.Post(SubmitUrl, Pkg); if (!PayloadResponse) { ZEN_WARN("unable to register payloads for action {} at {}{}", ActionId, m_Http.GetBaseUri(), SubmitUrl); // TODO: include more information about the failure in the response return {.IsAccepted = false, .Reason = "HTTP request failed"}; } else if (PayloadResponse.StatusCode == HttpResponseCode::OK) { Result = PayloadResponse.AsObject(); } else { // Unexpected response const int ResponseStatusCode = (int)PayloadResponse.StatusCode; ZEN_WARN("unable to register payloads for action {} at {}{} (error: {} {})", ActionId, m_Http.GetBaseUri(), SubmitUrl, ResponseStatusCode, ToString(ResponseStatusCode)); return {.IsAccepted = false, .Reason = fmt::format("unexpected response code {} {} from {}{}", ResponseStatusCode, ToString(ResponseStatusCode), m_Http.GetBaseUri(), SubmitUrl)}; } } if (Result) { if (const int32_t LsnField = Result["lsn"].AsInt32(0)) { HttpRunningAction NewAction; NewAction.Action = Action; NewAction.RemoteActionLsn = LsnField; { RwLock::ExclusiveLockScope _(m_RunningLock); m_RemoteRunningMap[LsnField] = std::move(NewAction); } ZEN_DEBUG("scheduled action {} with remote LSN {} (local LSN {})", ActionId, LsnField, ActionLsn); Action->SetActionState(RunnerAction::State::Running); return SubmitResult{.IsAccepted = true}; } } return {}; } Oid RemoteHttpRunner::EnsureRemoteQueue(int QueueId, const CbObject& Metadata, const CbObject& Config) { { RwLock::SharedLockScope _(m_QueueTokenLock); if (auto It = m_RemoteQueueTokens.find(QueueId); It != m_RemoteQueueTokens.end()) { return It->second; } } // Build a stable idempotency key that uniquely identifies this (runner instance, local queue) // pair. The server uses this to return the same remote queue token for concurrent or redundant // requests, preventing orphaned remote queues when multiple threads race through here. // Also send hostname so the server can associate the queue with its origin for diagnostics. CbObjectWriter Body; Body << "idempotency_key"sv << fmt::format("{}/{}", m_InstanceId, QueueId); Body << "hostname"sv << GetMachineName(); if (Metadata) { Body << "metadata"sv << Metadata; } if (Config) { Body << "config"sv << Config; } HttpClient::Response Resp = m_Http.Post("/queues/remote", Body.Save()); if (!Resp) { ZEN_WARN("failed to create remote queue for local queue {} on {}", QueueId, m_HostName); return Oid::Zero; } Oid Token = Oid::TryFromHexString(Resp.AsObject()["queue_token"sv].AsString()); if (Token == Oid::Zero) { return Oid::Zero; } ZEN_DEBUG("created remote queue '{}' for local queue {} on {}", Token, QueueId, m_HostName); RwLock::ExclusiveLockScope _(m_QueueTokenLock); auto [It, Inserted] = m_RemoteQueueTokens.try_emplace(QueueId, Token); return It->second; } void RemoteHttpRunner::CancelRemoteQueue(int QueueId) { Oid Token; { RwLock::SharedLockScope _(m_QueueTokenLock); if (auto It = m_RemoteQueueTokens.find(QueueId); It != m_RemoteQueueTokens.end()) { Token = It->second; } } if (Token == Oid::Zero) { return; } HttpClient::Response Resp = m_Http.Delete(fmt::format("/queues/{}", Token)); if (Resp.StatusCode == HttpResponseCode::NoContent) { ZEN_DEBUG("cancelled remote queue '{}' (local queue {}) on {}", Token, QueueId, m_HostName); } else { ZEN_WARN("failed to cancel remote queue '{}' on {}: {}", Token, m_HostName, int(Resp.StatusCode)); } } bool RemoteHttpRunner::IsHealthy() { if (HttpClient::Response Ready = m_Http.Get("/ready")) { return true; } else { // TODO: use response to propagate context return false; } } size_t RemoteHttpRunner::GetSubmittedActionCount() { RwLock::SharedLockScope _(m_RunningLock); return m_RemoteRunningMap.size(); } void RemoteHttpRunner::MonitorThreadFunction() { SetCurrentThreadName("RemoteHttpRunner_Monitor"); do { const int NormalWaitingTime = 200; int WaitTimeMs = NormalWaitingTime; auto WaitOnce = [&] { return m_MonitorThreadEvent.Wait(WaitTimeMs); }; auto SweepOnce = [&] { const size_t RetiredCount = SweepRunningActions(); m_RunningLock.WithSharedLock([&] { if (m_RemoteRunningMap.size() > 16) { WaitTimeMs = NormalWaitingTime / 4; } else { if (RetiredCount) { WaitTimeMs = NormalWaitingTime / 2; } else { WaitTimeMs = NormalWaitingTime; } } }); }; while (!WaitOnce()) { SweepOnce(); } // Signal received - this may mean we should quit SweepOnce(); } while (m_MonitorThreadEnabled); } size_t RemoteHttpRunner::SweepRunningActions() { ZEN_TRACE_CPU("RemoteHttpRunner::SweepRunningActions"); std::vector CompletedActions; // Poll remote for list of completed actions HttpClient::Response ResponseCompleted = m_Http.Get("/jobs/completed"sv); if (CbObject Completed = ResponseCompleted.AsObject()) { for (auto& FieldIt : Completed["completed"sv]) { CbObjectView EntryObj = FieldIt.AsObjectView(); const int32_t CompleteLsn = EntryObj["lsn"sv].AsInt32(); std::string_view StateName = EntryObj["state"sv].AsString(); RunnerAction::State RemoteState = RunnerAction::FromString(StateName); // Always fetch to drain the result from the remote's results map, // but only keep the result package for successfully completed actions. HttpClient::Response ResponseJob = m_Http.Get(fmt::format("/jobs/{}"sv, CompleteLsn)); m_RunningLock.WithExclusiveLock([&] { if (auto CompleteIt = m_RemoteRunningMap.find(CompleteLsn); CompleteIt != m_RemoteRunningMap.end()) { HttpRunningAction CompletedAction = std::move(CompleteIt->second); CompletedAction.RemoteState = RemoteState; if (RemoteState == RunnerAction::State::Completed && ResponseJob) { CompletedAction.ActionResults = ResponseJob.AsPackage(); } CompletedActions.push_back(std::move(CompletedAction)); m_RemoteRunningMap.erase(CompleteIt); } else { // we received a completion notice for an action we don't know about, // this can happen if the runner is used by multiple upstream schedulers, // or if this compute node was recently restarted and lost track of // previously scheduled actions } }); } if (CbObjectView Metrics = Completed["metrics"sv].AsObjectView()) { // if (const size_t CpuCount = Metrics["core_count"].AsInt32(0)) if (const int32_t CpuCount = Metrics["lp_count"].AsInt32(0)) { const int32_t NewCap = zen::Max(4, CpuCount); if (m_MaxRunningActions > NewCap) { ZEN_DEBUG("capping {} to {} actions (was {})", m_BaseUrl, NewCap, m_MaxRunningActions); m_MaxRunningActions = NewCap; } } } } // Notify outer. Note that this has to be done without holding any local locks // otherwise we may end up with deadlocks. for (HttpRunningAction& HttpAction : CompletedActions) { const int ActionLsn = HttpAction.Action->ActionLsn; ZEN_DEBUG("action {} LSN {} (remote LSN {}) -> {}", HttpAction.Action->ActionId, ActionLsn, HttpAction.RemoteActionLsn, RunnerAction::ToString(HttpAction.RemoteState)); if (HttpAction.RemoteState == RunnerAction::State::Completed) { HttpAction.Action->SetResult(std::move(HttpAction.ActionResults)); } HttpAction.Action->SetActionState(HttpAction.RemoteState); } return CompletedActions.size(); } } // namespace zen::compute #endif