// Copyright Epic Games, Inc. All Rights Reserved. #include "remotehttprunner.h" #if ZEN_WITH_COMPUTE_SERVICES # include # include # include # include # include # include # include # include # include # include # include # include # include # include ////////////////////////////////////////////////////////////////////////// namespace zen::compute { using namespace std::literals; ////////////////////////////////////////////////////////////////////////// RemoteHttpRunner::RemoteHttpRunner(ChunkResolver& InChunkResolver, const std::filesystem::path& BaseDir, std::string_view HostName, WorkerThreadPool& InWorkerPool) : FunctionRunner(BaseDir) , m_Log(logging::Get("http_exec")) , m_ChunkResolver{InChunkResolver} , m_WorkerPool{InWorkerPool} , m_HostName{HostName} , m_BaseUrl{fmt::format("{}/apply", HostName)} , m_Http(m_BaseUrl) { m_MonitorThread = std::thread{&RemoteHttpRunner::MonitorThreadFunction, this}; } RemoteHttpRunner::~RemoteHttpRunner() { Shutdown(); } void RemoteHttpRunner::Shutdown() { // TODO: should cleanly drain/cancel pending work m_MonitorThreadEnabled = false; m_MonitorThreadEvent.Set(); if (m_MonitorThread.joinable()) { m_MonitorThread.join(); } } void RemoteHttpRunner::RegisterWorker(const CbPackage& WorkerPackage) { ZEN_TRACE_CPU("RemoteHttpRunner::RegisterWorker"); const IoHash WorkerId = WorkerPackage.GetObjectHash(); CbPackage WorkerDesc = WorkerPackage; std::string WorkerUrl = fmt::format("/workers/{}", WorkerId); HttpClient::Response WorkerResponse = m_Http.Get(WorkerUrl); if (WorkerResponse.StatusCode == HttpResponseCode::NotFound) { HttpClient::Response DescResponse = m_Http.Post(WorkerUrl, WorkerDesc.GetObject()); if (DescResponse.StatusCode == HttpResponseCode::NotFound) { CbPackage Pkg = WorkerDesc; // Build response package by sending only the attachments // the other end needs. We start with the full package and // remove the attachments which are not needed. { std::unordered_set Needed; CbObject Response = DescResponse.AsObject(); for (auto& Item : Response["need"sv]) { const IoHash NeedHash = Item.AsHash(); Needed.insert(NeedHash); } std::unordered_set ToRemove; for (const CbAttachment& Attachment : Pkg.GetAttachments()) { const IoHash& Hash = Attachment.GetHash(); if (Needed.find(Hash) == Needed.end()) { ToRemove.insert(Hash); } } for (const IoHash& Hash : ToRemove) { int RemovedCount = Pkg.RemoveAttachment(Hash); ZEN_ASSERT(RemovedCount == 1); } } // Post resulting package HttpClient::Response PayloadResponse = m_Http.Post(WorkerUrl, Pkg); if (!IsHttpSuccessCode(PayloadResponse.StatusCode)) { ZEN_ERROR("ERROR: unable to register payloads for worker {} at {}{}", WorkerId, m_Http.GetBaseUri(), WorkerUrl); // TODO: propagate error } } else if (!IsHttpSuccessCode(DescResponse.StatusCode)) { ZEN_ERROR("ERROR: unable to register worker {} at {}{}", WorkerId, m_Http.GetBaseUri(), WorkerUrl); // TODO: propagate error } else { ZEN_ASSERT(DescResponse.StatusCode == HttpResponseCode::NoContent); } } else if (WorkerResponse.StatusCode == HttpResponseCode::OK) { // Already known from a previous run } else if (!IsHttpSuccessCode(WorkerResponse.StatusCode)) { ZEN_ERROR("ERROR: unable to look up worker {} at {}{} (error: {} {})", WorkerId, m_Http.GetBaseUri(), WorkerUrl, (int)WorkerResponse.StatusCode, ToString(WorkerResponse.StatusCode)); // TODO: propagate error } } size_t RemoteHttpRunner::QueryCapacity() { // Estimate how much more work we're ready to accept RwLock::SharedLockScope _{m_RunningLock}; size_t RunningCount = m_RemoteRunningMap.size(); if (RunningCount >= size_t(m_MaxRunningActions)) { return 0; } return m_MaxRunningActions - RunningCount; } std::vector RemoteHttpRunner::SubmitActions(const std::vector>& Actions) { ZEN_TRACE_CPU("RemoteHttpRunner::SubmitActions"); if (Actions.size() <= 1) { std::vector Results; for (const Ref& Action : Actions) { Results.push_back(SubmitAction(Action)); } return Results; } // For larger batches, submit HTTP requests in parallel via the shared worker pool std::vector> Futures; Futures.reserve(Actions.size()); for (const Ref& Action : Actions) { std::packaged_task Task([this, Action]() { return SubmitAction(Action); }); Futures.push_back(m_WorkerPool.EnqueueTask(std::move(Task), WorkerThreadPool::EMode::EnableBacklog)); } std::vector Results; Results.reserve(Futures.size()); for (auto& Future : Futures) { Results.push_back(Future.get()); } return Results; } SubmitResult RemoteHttpRunner::SubmitAction(Ref Action) { ZEN_TRACE_CPU("RemoteHttpRunner::SubmitAction"); // Verify whether we can accept more work { RwLock::SharedLockScope _{m_RunningLock}; if (m_RemoteRunningMap.size() >= size_t(m_MaxRunningActions)) { return SubmitResult{.IsAccepted = false}; } } using namespace std::literals; // Each enqueued action is assigned an integer index (logical sequence number), // which we use as a key for tracking data structures and as an opaque id which // may be used by clients to reference the scheduled action const int32_t ActionLsn = Action->ActionLsn; const CbObject& ActionObj = Action->ActionObj; const IoHash ActionId = ActionObj.GetHash(); MaybeDumpAction(ActionLsn, ActionObj); // Enqueue job CbObject Result; HttpClient::Response WorkResponse = m_Http.Post("/jobs", ActionObj); HttpResponseCode WorkResponseCode = WorkResponse.StatusCode; if (WorkResponseCode == HttpResponseCode::OK) { Result = WorkResponse.AsObject(); } else if (WorkResponseCode == HttpResponseCode::NotFound) { // Not all attachments are present // Build response package including all required attachments CbPackage Pkg; Pkg.SetObject(ActionObj); CbObject Response = WorkResponse.AsObject(); for (auto& Item : Response["need"sv]) { const IoHash NeedHash = Item.AsHash(); if (IoBuffer Chunk = m_ChunkResolver.FindChunkByCid(NeedHash)) { uint64_t DataRawSize = 0; IoHash DataRawHash; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer{Chunk}, /* out */ DataRawHash, /* out */ DataRawSize); ZEN_ASSERT(DataRawHash == NeedHash); Pkg.AddAttachment(CbAttachment(Compressed, NeedHash)); } else { // No such attachment return {.IsAccepted = false, .Reason = fmt::format("missing attachment {}", NeedHash)}; } } // Post resulting package HttpClient::Response PayloadResponse = m_Http.Post("/jobs", Pkg); if (!PayloadResponse) { ZEN_WARN("unable to register payloads for action {} at {}/jobs", ActionId, m_Http.GetBaseUri()); // TODO: include more information about the failure in the response return {.IsAccepted = false, .Reason = "HTTP request failed"}; } else if (PayloadResponse.StatusCode == HttpResponseCode::OK) { Result = PayloadResponse.AsObject(); } else { // Unexpected response const int ResponseStatusCode = (int)PayloadResponse.StatusCode; ZEN_WARN("unable to register payloads for action {} at {}/jobs (error: {} {})", ActionId, m_Http.GetBaseUri(), ResponseStatusCode, ToString(ResponseStatusCode)); return {.IsAccepted = false, .Reason = fmt::format("unexpected response code {} {} from {}/jobs", ResponseStatusCode, ToString(ResponseStatusCode), m_Http.GetBaseUri())}; } } if (Result) { if (const int32_t LsnField = Result["lsn"].AsInt32(0)) { HttpRunningAction NewAction; NewAction.Action = Action; NewAction.RemoteActionLsn = LsnField; { RwLock::ExclusiveLockScope _(m_RunningLock); m_RemoteRunningMap[LsnField] = std::move(NewAction); } ZEN_DEBUG("scheduled action {} with remote LSN {} (local LSN {})", ActionId, LsnField, ActionLsn); Action->SetActionState(RunnerAction::State::Running); return SubmitResult{.IsAccepted = true}; } } return {}; } bool RemoteHttpRunner::IsHealthy() { if (HttpClient::Response Ready = m_Http.Get("/ready")) { return true; } else { // TODO: use response to propagate context return false; } } size_t RemoteHttpRunner::GetSubmittedActionCount() { RwLock::SharedLockScope _(m_RunningLock); return m_RemoteRunningMap.size(); } void RemoteHttpRunner::MonitorThreadFunction() { SetCurrentThreadName("RemoteHttpRunner_Monitor"); do { const int NormalWaitingTime = 200; int WaitTimeMs = NormalWaitingTime; auto WaitOnce = [&] { return m_MonitorThreadEvent.Wait(WaitTimeMs); }; auto SweepOnce = [&] { const size_t RetiredCount = SweepRunningActions(); m_RunningLock.WithSharedLock([&] { if (m_RemoteRunningMap.size() > 16) { WaitTimeMs = NormalWaitingTime / 4; } else { if (RetiredCount) { WaitTimeMs = NormalWaitingTime / 2; } else { WaitTimeMs = NormalWaitingTime; } } }); }; while (!WaitOnce()) { SweepOnce(); } // Signal received - this may mean we should quit SweepOnce(); } while (m_MonitorThreadEnabled); } size_t RemoteHttpRunner::SweepRunningActions() { ZEN_TRACE_CPU("RemoteHttpRunner::SweepRunningActions"); std::vector CompletedActions; // Poll remote for list of completed actions HttpClient::Response ResponseCompleted = m_Http.Get("/jobs/completed"sv); if (CbObject Completed = ResponseCompleted.AsObject()) { for (auto& FieldIt : Completed["completed"sv]) { const int32_t CompleteLsn = FieldIt.AsInt32(); if (HttpClient::Response ResponseJob = m_Http.Get(fmt::format("/jobs/{}"sv, CompleteLsn))) { m_RunningLock.WithExclusiveLock([&] { if (auto CompleteIt = m_RemoteRunningMap.find(CompleteLsn); CompleteIt != m_RemoteRunningMap.end()) { HttpRunningAction CompletedAction = std::move(CompleteIt->second); CompletedAction.ActionResults = ResponseJob.AsPackage(); CompletedAction.Success = true; CompletedActions.push_back(std::move(CompletedAction)); m_RemoteRunningMap.erase(CompleteIt); } else { // we received a completion notice for an action we don't know about, // this can happen if the runner is used by multiple upstream schedulers, // or if this compute node was recently restarted and lost track of // previously scheduled actions } }); } } if (CbObjectView Metrics = Completed["metrics"sv].AsObjectView()) { // if (const size_t CpuCount = Metrics["core_count"].AsInt32(0)) if (const int32_t CpuCount = Metrics["lp_count"].AsInt32(0)) { const int32_t NewCap = zen::Max(4, CpuCount); if (m_MaxRunningActions > NewCap) { ZEN_DEBUG("capping {} to {} actions (was {})", m_BaseUrl, NewCap, m_MaxRunningActions); m_MaxRunningActions = NewCap; } } } } // Notify outer. Note that this has to be done without holding any local locks // otherwise we may end up with deadlocks. for (HttpRunningAction& HttpAction : CompletedActions) { const int ActionLsn = HttpAction.Action->ActionLsn; if (HttpAction.Success) { ZEN_DEBUG("completed: {} LSN {} (remote LSN {})", HttpAction.Action->ActionId, ActionLsn, HttpAction.RemoteActionLsn); HttpAction.Action->SetResult(std::move(HttpAction.ActionResults)); HttpAction.Action->SetActionState(RunnerAction::State::Completed); } else { HttpAction.Action->SetActionState(RunnerAction::State::Failed); } } return CompletedActions.size(); } } // namespace zen::compute #endif