// Copyright Epic Games, Inc. All Rights Reserved. #include "remotehttprunner.h" #if ZEN_WITH_COMPUTE_SERVICES # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include ////////////////////////////////////////////////////////////////////////// namespace zen::compute { using namespace std::literals; ////////////////////////////////////////////////////////////////////////// RemoteHttpRunner::RemoteHttpRunner(ChunkResolver& InChunkResolver, const std::filesystem::path& BaseDir, std::string_view HostName, WorkerThreadPool& InWorkerPool) : FunctionRunner(BaseDir) , m_Log(logging::Get("http_exec")) , m_ChunkResolver{InChunkResolver} , m_WorkerPool{InWorkerPool} , m_HostName{HostName} , m_BaseUrl{fmt::format("{}/compute", HostName)} , m_Http(m_BaseUrl) , m_InstanceId(Oid::NewOid()) { // Attempt to connect a WebSocket for push-based completion notifications. // If the remote doesn't support WS, OnWsClose fires and we fall back to polling. { std::string WsUrl = HttpToWsUrl(HostName, "/compute/ws"); HttpWsClientSettings WsSettings; WsSettings.LogCategory = "http_exec_ws"; WsSettings.ConnectTimeout = std::chrono::milliseconds{3000}; IWsClientHandler& Handler = *this; m_WsClient = std::make_unique(WsUrl, Handler, WsSettings); m_WsClient->Connect(); } m_MonitorThread = std::thread{&RemoteHttpRunner::MonitorThreadFunction, this}; } RemoteHttpRunner::~RemoteHttpRunner() { Shutdown(); } void RemoteHttpRunner::Shutdown() { m_AcceptNewActions = false; // Close the WebSocket client first, so no more wakeup signals arrive. if (m_WsClient) { m_WsClient->Close(); } // Cancel all known remote queues so the remote side stops scheduling new // work and cancels in-flight actions belonging to those queues. { std::vector> Queues; m_QueueTokenLock.WithSharedLock([&] { Queues.assign(m_RemoteQueueTokens.begin(), m_RemoteQueueTokens.end()); }); for (const auto& [QueueId, Token] : Queues) { CancelRemoteQueue(QueueId); } } // Stop the monitor thread so it no longer polls the remote. m_MonitorThreadEnabled = false; m_MonitorThreadEvent.Set(); if (m_MonitorThread.joinable()) { m_MonitorThread.join(); } // Drain the running map and mark all remaining actions as Failed so the // scheduler can reschedule or finalize them. std::unordered_map Remaining; m_RunningLock.WithExclusiveLock([&] { Remaining.swap(m_RemoteRunningMap); }); for (auto& [RemoteLsn, HttpAction] : Remaining) { ZEN_DEBUG("shutdown: marking remote action LSN {} (local LSN {}) as Failed", RemoteLsn, HttpAction.Action->ActionLsn); HttpAction.Action->SetActionState(RunnerAction::State::Failed); } } bool RemoteHttpRunner::RegisterWorker(const CbPackage& WorkerPackage) { ZEN_TRACE_CPU("RemoteHttpRunner::RegisterWorker"); const IoHash WorkerId = WorkerPackage.GetObjectHash(); CbPackage WorkerDesc = WorkerPackage; std::string WorkerUrl = fmt::format("/workers/{}", WorkerId); HttpClient::Response WorkerResponse = m_Http.Get(WorkerUrl); if (WorkerResponse.StatusCode == HttpResponseCode::NotFound) { HttpClient::Response DescResponse = m_Http.Post(WorkerUrl, WorkerDesc.GetObject()); if (DescResponse.StatusCode == HttpResponseCode::NotFound) { CbPackage Pkg = WorkerDesc; // Build response package by sending only the attachments // the other end needs. We start with the full package and // remove the attachments which are not needed. { std::unordered_set Needed; CbObject Response = DescResponse.AsObject(); for (auto& Item : Response["need"sv]) { const IoHash NeedHash = Item.AsHash(); Needed.insert(NeedHash); } std::unordered_set ToRemove; for (const CbAttachment& Attachment : Pkg.GetAttachments()) { const IoHash& Hash = Attachment.GetHash(); if (Needed.find(Hash) == Needed.end()) { ToRemove.insert(Hash); } } for (const IoHash& Hash : ToRemove) { int RemovedCount = Pkg.RemoveAttachment(Hash); ZEN_ASSERT(RemovedCount == 1); } } // Post resulting package HttpClient::Response PayloadResponse = m_Http.Post(WorkerUrl, Pkg); if (!IsHttpSuccessCode(PayloadResponse.StatusCode)) { ZEN_ERROR("ERROR: unable to register payloads for worker {} at {}{}", WorkerId, m_Http.GetBaseUri(), WorkerUrl); return false; } } else if (!IsHttpSuccessCode(DescResponse.StatusCode)) { ZEN_ERROR("ERROR: unable to register worker {} at {}{}", WorkerId, m_Http.GetBaseUri(), WorkerUrl); return false; } else { ZEN_ASSERT(DescResponse.StatusCode == HttpResponseCode::NoContent); } } else if (WorkerResponse.StatusCode == HttpResponseCode::OK) { // Already known from a previous run } else if (!IsHttpSuccessCode(WorkerResponse.StatusCode)) { ZEN_ERROR("ERROR: unable to look up worker {} at {}{} (error: {} {})", WorkerId, m_Http.GetBaseUri(), WorkerUrl, (int)WorkerResponse.StatusCode, ToString(WorkerResponse.StatusCode)); return false; } return true; } size_t RemoteHttpRunner::QueryCapacity() { if (!m_AcceptNewActions) { return 0; } // Estimate how much more work we're ready to accept RwLock::SharedLockScope _{m_RunningLock}; size_t RunningCount = m_RemoteRunningMap.size(); if (RunningCount >= size_t(m_MaxRunningActions)) { return 0; } return m_MaxRunningActions - RunningCount; } std::vector RemoteHttpRunner::SubmitActions(const std::vector>& Actions) { ZEN_TRACE_CPU("RemoteHttpRunner::SubmitActions"); if (Actions.size() <= 1) { std::vector Results; for (const Ref& Action : Actions) { Results.push_back(SubmitAction(Action)); } return Results; } // Collect distinct QueueIds and ensure remote queues exist once per queue std::unordered_map QueueTokens; // QueueId → remote token (0 stays as Zero) for (const Ref& Action : Actions) { const int QueueId = Action->QueueId; if (QueueId != 0 && QueueTokens.find(QueueId) == QueueTokens.end()) { CbObject QueueMeta = Action->GetOwnerSession()->GetQueueMetadata(QueueId); CbObject QueueConfig = Action->GetOwnerSession()->GetQueueConfig(QueueId); QueueTokens[QueueId] = EnsureRemoteQueue(QueueId, QueueMeta, QueueConfig); } } // Group actions by QueueId struct QueueGroup { std::vector> Actions; std::vector OriginalIndices; }; std::unordered_map Groups; for (size_t i = 0; i < Actions.size(); ++i) { auto& Group = Groups[Actions[i]->QueueId]; Group.Actions.push_back(Actions[i]); Group.OriginalIndices.push_back(i); } // Submit each group as a batch and map results back to original indices std::vector Results(Actions.size()); for (auto& [QueueId, Group] : Groups) { std::string SubmitUrl = "/jobs"; if (QueueId != 0) { if (Oid Token = QueueTokens[QueueId]; Token != Oid::Zero) { SubmitUrl = fmt::format("/queues/{}/jobs", Token); } } const size_t BatchLimit = size_t(m_MaxBatchSize); for (size_t Offset = 0; Offset < Group.Actions.size(); Offset += BatchLimit) { size_t End = zen::Min(Offset + BatchLimit, Group.Actions.size()); std::vector> Chunk(Group.Actions.begin() + Offset, Group.Actions.begin() + End); std::vector ChunkResults = SubmitActionBatch(SubmitUrl, Chunk); for (size_t j = 0; j < ChunkResults.size(); ++j) { Results[Group.OriginalIndices[Offset + j]] = std::move(ChunkResults[j]); } } } return Results; } SubmitResult RemoteHttpRunner::SubmitAction(Ref Action) { ZEN_TRACE_CPU("RemoteHttpRunner::SubmitAction"); // Verify whether we can accept more work if (!m_AcceptNewActions) { return SubmitResult{.IsAccepted = false, .Reason = "runner is shutting down"}; } { RwLock::SharedLockScope _{m_RunningLock}; if (m_RemoteRunningMap.size() >= size_t(m_MaxRunningActions)) { return SubmitResult{.IsAccepted = false}; } } using namespace std::literals; // Each enqueued action is assigned an integer index (logical sequence number), // which we use as a key for tracking data structures and as an opaque id which // may be used by clients to reference the scheduled action Action->ExecutionLocation = m_HostName; const int32_t ActionLsn = Action->ActionLsn; const CbObject& ActionObj = Action->ActionObj; const IoHash ActionId = ActionObj.GetHash(); MaybeDumpAction(ActionLsn, ActionObj); // Determine the submission URL. If the action belongs to a queue, ensure a // corresponding remote queue exists on the target node and submit via it. std::string SubmitUrl = "/jobs"; if (const int QueueId = Action->QueueId; QueueId != 0) { CbObject QueueMeta = Action->GetOwnerSession()->GetQueueMetadata(QueueId); CbObject QueueConfig = Action->GetOwnerSession()->GetQueueConfig(QueueId); if (Oid Token = EnsureRemoteQueue(QueueId, QueueMeta, QueueConfig); Token != Oid::Zero) { SubmitUrl = fmt::format("/queues/{}/jobs", Token); } } // Enqueue job. If the remote returns FailedDependency (424), it means it // cannot resolve the worker/function — re-register the worker and retry once. CbObject Result; HttpClient::Response WorkResponse; HttpResponseCode WorkResponseCode{}; for (int Attempt = 0; Attempt < 2; ++Attempt) { WorkResponse = m_Http.Post(SubmitUrl, ActionObj); WorkResponseCode = WorkResponse.StatusCode; if (WorkResponseCode == HttpResponseCode::FailedDependency && Attempt == 0) { ZEN_WARN("remote {} returned FailedDependency for action {} — re-registering worker and retrying", m_Http.GetBaseUri(), ActionId); (void)RegisterWorker(Action->Worker.Descriptor); } else { break; } } if (WorkResponseCode == HttpResponseCode::OK) { Result = WorkResponse.AsObject(); } else if (WorkResponseCode == HttpResponseCode::NotFound) { // Not all attachments are present // Build response package including all required attachments CbPackage Pkg; Pkg.SetObject(ActionObj); CbObject Response = WorkResponse.AsObject(); for (auto& Item : Response["need"sv]) { const IoHash NeedHash = Item.AsHash(); if (IoBuffer Chunk = m_ChunkResolver.FindChunkByCid(NeedHash)) { uint64_t DataRawSize = 0; IoHash DataRawHash; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer{Chunk}, /* out */ DataRawHash, /* out */ DataRawSize); ZEN_ASSERT(DataRawHash == NeedHash); Pkg.AddAttachment(CbAttachment(Compressed, NeedHash)); } else { // No such attachment return {.IsAccepted = false, .Reason = fmt::format("missing attachment {}", NeedHash)}; } } // Post resulting package HttpClient::Response PayloadResponse = m_Http.Post(SubmitUrl, Pkg); if (!PayloadResponse) { ZEN_WARN("unable to register payloads for action {} at {}{}", ActionId, m_Http.GetBaseUri(), SubmitUrl); // TODO: include more information about the failure in the response return {.IsAccepted = false, .Reason = "HTTP request failed"}; } else if (PayloadResponse.StatusCode == HttpResponseCode::OK) { Result = PayloadResponse.AsObject(); } else { // Unexpected response const int ResponseStatusCode = (int)PayloadResponse.StatusCode; ZEN_WARN("unable to register payloads for action {} at {}{} (error: {} {})", ActionId, m_Http.GetBaseUri(), SubmitUrl, ResponseStatusCode, ToString(ResponseStatusCode)); return {.IsAccepted = false, .Reason = fmt::format("unexpected response code {} {} from {}{}", ResponseStatusCode, ToString(ResponseStatusCode), m_Http.GetBaseUri(), SubmitUrl)}; } } if (Result) { if (const int32_t LsnField = Result["lsn"].AsInt32(0)) { HttpRunningAction NewAction; NewAction.Action = Action; NewAction.RemoteActionLsn = LsnField; { RwLock::ExclusiveLockScope _(m_RunningLock); m_RemoteRunningMap[LsnField] = std::move(NewAction); } ZEN_DEBUG("scheduled action {} with remote LSN {} (local LSN {})", ActionId, LsnField, ActionLsn); Action->SetActionState(RunnerAction::State::Running); return SubmitResult{.IsAccepted = true}; } } return {}; } std::vector RemoteHttpRunner::SubmitActionBatch(const std::string& SubmitUrl, const std::vector>& Actions) { ZEN_TRACE_CPU("RemoteHttpRunner::SubmitActionBatch"); if (!m_AcceptNewActions) { return std::vector(Actions.size(), SubmitResult{.IsAccepted = false, .Reason = "runner is shutting down"}); } // Capacity check { RwLock::SharedLockScope _{m_RunningLock}; if (m_RemoteRunningMap.size() >= size_t(m_MaxRunningActions)) { std::vector Results(Actions.size(), SubmitResult{.IsAccepted = false}); return Results; } } // Per-action setup and build batch body CbObjectWriter Body; Body.BeginArray("actions"sv); for (const Ref& Action : Actions) { Action->ExecutionLocation = m_HostName; MaybeDumpAction(Action->ActionLsn, Action->ActionObj); Body.AddObject(Action->ActionObj); } Body.EndArray(); // POST the batch HttpClient::Response Response = m_Http.Post(SubmitUrl, Body.Save()); if (Response.StatusCode == HttpResponseCode::OK) { return ParseBatchResponse(Response, Actions); } if (Response.StatusCode == HttpResponseCode::NotFound) { // Server needs attachments — resolve them and retry with a CbPackage CbObject NeedObj = Response.AsObject(); CbPackage Pkg; Pkg.SetObject(Body.Save()); for (auto& Item : NeedObj["need"sv]) { const IoHash NeedHash = Item.AsHash(); if (IoBuffer Chunk = m_ChunkResolver.FindChunkByCid(NeedHash)) { uint64_t DataRawSize = 0; IoHash DataRawHash; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer{Chunk}, /* out */ DataRawHash, /* out */ DataRawSize); ZEN_ASSERT(DataRawHash == NeedHash); Pkg.AddAttachment(CbAttachment(Compressed, NeedHash)); } else { ZEN_WARN("batch submit: missing attachment {} — falling back to individual submit", NeedHash); return FallbackToIndividualSubmit(Actions); } } HttpClient::Response RetryResponse = m_Http.Post(SubmitUrl, Pkg); if (RetryResponse.StatusCode == HttpResponseCode::OK) { return ParseBatchResponse(RetryResponse, Actions); } ZEN_WARN("batch submit retry failed with {} {} — falling back to individual submit", (int)RetryResponse.StatusCode, ToString(RetryResponse.StatusCode)); return FallbackToIndividualSubmit(Actions); } // Unexpected status or connection error — fall back to individual submission if (Response) { ZEN_WARN("batch submit to {}{} returned {} {} — falling back to individual submit", m_Http.GetBaseUri(), SubmitUrl, (int)Response.StatusCode, ToString(Response.StatusCode)); } else { ZEN_WARN("batch submit to {}{} failed — falling back to individual submit", m_Http.GetBaseUri(), SubmitUrl); } return FallbackToIndividualSubmit(Actions); } std::vector RemoteHttpRunner::ParseBatchResponse(const HttpClient::Response& Response, const std::vector>& Actions) { std::vector Results; Results.reserve(Actions.size()); CbObject ResponseObj = Response.AsObject(); CbArrayView ResultArray = ResponseObj["results"sv].AsArrayView(); size_t Index = 0; for (CbFieldView Field : ResultArray) { if (Index >= Actions.size()) { break; } CbObjectView Entry = Field.AsObjectView(); const int32_t LsnField = Entry["lsn"sv].AsInt32(0); if (LsnField > 0) { HttpRunningAction NewAction; NewAction.Action = Actions[Index]; NewAction.RemoteActionLsn = LsnField; { RwLock::ExclusiveLockScope _(m_RunningLock); m_RemoteRunningMap[LsnField] = std::move(NewAction); } ZEN_DEBUG("batch: scheduled action {} with remote LSN {} (local LSN {})", Actions[Index]->ActionObj.GetHash(), LsnField, Actions[Index]->ActionLsn); Actions[Index]->SetActionState(RunnerAction::State::Running); Results.push_back(SubmitResult{.IsAccepted = true}); } else { std::string_view ErrorMsg = Entry["error"sv].AsString(); Results.push_back(SubmitResult{.IsAccepted = false, .Reason = std::string(ErrorMsg)}); } ++Index; } // If the server returned fewer results than actions, mark the rest as not accepted while (Results.size() < Actions.size()) { Results.push_back(SubmitResult{.IsAccepted = false, .Reason = "no result from server"}); } return Results; } std::vector RemoteHttpRunner::FallbackToIndividualSubmit(const std::vector>& Actions) { std::vector> Futures; Futures.reserve(Actions.size()); for (const Ref& Action : Actions) { std::packaged_task Task([this, Action]() { return SubmitAction(Action); }); Futures.push_back(m_WorkerPool.EnqueueTask(std::move(Task), WorkerThreadPool::EMode::EnableBacklog)); } std::vector Results; Results.reserve(Futures.size()); for (auto& Future : Futures) { Results.push_back(Future.get()); } return Results; } Oid RemoteHttpRunner::EnsureRemoteQueue(int QueueId, const CbObject& Metadata, const CbObject& Config) { { RwLock::SharedLockScope _(m_QueueTokenLock); if (auto It = m_RemoteQueueTokens.find(QueueId); It != m_RemoteQueueTokens.end()) { return It->second; } } // Build a stable idempotency key that uniquely identifies this (runner instance, local queue) // pair. The server uses this to return the same remote queue token for concurrent or redundant // requests, preventing orphaned remote queues when multiple threads race through here. // Also send hostname so the server can associate the queue with its origin for diagnostics. CbObjectWriter Body; Body << "idempotency_key"sv << fmt::format("{}/{}", m_InstanceId, QueueId); Body << "hostname"sv << GetMachineName(); if (Metadata) { Body << "metadata"sv << Metadata; } if (Config) { Body << "config"sv << Config; } HttpClient::Response Resp = m_Http.Post("/queues/remote", Body.Save()); if (!Resp) { ZEN_WARN("failed to create remote queue for local queue {} on {}", QueueId, m_HostName); return Oid::Zero; } Oid Token = Oid::TryFromHexString(Resp.AsObject()["queue_token"sv].AsString()); if (Token == Oid::Zero) { return Oid::Zero; } ZEN_DEBUG("created remote queue '{}' for local queue {} on {}", Token, QueueId, m_HostName); RwLock::ExclusiveLockScope _(m_QueueTokenLock); auto [It, Inserted] = m_RemoteQueueTokens.try_emplace(QueueId, Token); return It->second; } void RemoteHttpRunner::CancelRemoteQueue(int QueueId) { Oid Token; { RwLock::SharedLockScope _(m_QueueTokenLock); if (auto It = m_RemoteQueueTokens.find(QueueId); It != m_RemoteQueueTokens.end()) { Token = It->second; } } if (Token == Oid::Zero) { return; } HttpClient::Response Resp = m_Http.Delete(fmt::format("/queues/{}", Token)); if (Resp.StatusCode == HttpResponseCode::NoContent) { ZEN_DEBUG("cancelled remote queue '{}' (local queue {}) on {}", Token, QueueId, m_HostName); } else { ZEN_WARN("failed to cancel remote queue '{}' on {}: {}", Token, m_HostName, int(Resp.StatusCode)); } } bool RemoteHttpRunner::IsHealthy() { if (HttpClient::Response Ready = m_Http.Get("/ready")) { return true; } else { // TODO: use response to propagate context return false; } } size_t RemoteHttpRunner::GetSubmittedActionCount() { RwLock::SharedLockScope _(m_RunningLock); return m_RemoteRunningMap.size(); } ////////////////////////////////////////////////////////////////////////// // // IWsClientHandler // void RemoteHttpRunner::OnWsOpen() { ZEN_INFO("WebSocket connected to {}", m_HostName); m_WsConnected.store(true, std::memory_order_release); } void RemoteHttpRunner::OnWsMessage([[maybe_unused]] const WebSocketMessage& Msg) { // The message content is a wakeup signal; no parsing needed. // Signal the monitor thread to sweep completed actions immediately. m_MonitorThreadEvent.Set(); } void RemoteHttpRunner::OnWsClose([[maybe_unused]] uint16_t Code, [[maybe_unused]] std::string_view Reason) { ZEN_WARN("WebSocket disconnected from {} (code {})", m_HostName, Code); m_WsConnected.store(false, std::memory_order_release); } ////////////////////////////////////////////////////////////////////////// void RemoteHttpRunner::MonitorThreadFunction() { SetCurrentThreadName("RemoteHttpRunner_Monitor"); do { const int NormalWaitingTime = 200; const int WsWaitingTime = 2000; // Safety-net interval when WS is connected int WaitTimeMs = m_WsConnected.load(std::memory_order_relaxed) ? WsWaitingTime : NormalWaitingTime; auto WaitOnce = [&] { return m_MonitorThreadEvent.Wait(WaitTimeMs); }; auto SweepOnce = [&] { const size_t RetiredCount = SweepRunningActions(); if (m_WsConnected.load(std::memory_order_relaxed)) { // WS connected: use long safety-net interval; the WS message // will wake us immediately for the real work. WaitTimeMs = WsWaitingTime; } else { // No WS: adaptive polling as before m_RunningLock.WithSharedLock([&] { if (m_RemoteRunningMap.size() > 16) { WaitTimeMs = NormalWaitingTime / 4; } else { if (RetiredCount) { WaitTimeMs = NormalWaitingTime / 2; } else { WaitTimeMs = NormalWaitingTime; } } }); } }; while (!WaitOnce()) { SweepOnce(); } // Signal received — may be a WS wakeup or a quit signal SweepOnce(); } while (m_MonitorThreadEnabled); } size_t RemoteHttpRunner::SweepRunningActions() { ZEN_TRACE_CPU("RemoteHttpRunner::SweepRunningActions"); std::vector CompletedActions; // Poll remote for list of completed actions HttpClient::Response ResponseCompleted = m_Http.Get("/jobs/completed"sv); if (CbObject Completed = ResponseCompleted.AsObject()) { for (auto& FieldIt : Completed["completed"sv]) { CbObjectView EntryObj = FieldIt.AsObjectView(); const int32_t CompleteLsn = EntryObj["lsn"sv].AsInt32(); std::string_view StateName = EntryObj["state"sv].AsString(); RunnerAction::State RemoteState = RunnerAction::FromString(StateName); // Always fetch to drain the result from the remote's results map, // but only keep the result package for successfully completed actions. HttpClient::Response ResponseJob = m_Http.Get(fmt::format("/jobs/{}"sv, CompleteLsn)); m_RunningLock.WithExclusiveLock([&] { if (auto CompleteIt = m_RemoteRunningMap.find(CompleteLsn); CompleteIt != m_RemoteRunningMap.end()) { HttpRunningAction CompletedAction = std::move(CompleteIt->second); CompletedAction.RemoteState = RemoteState; if (RemoteState == RunnerAction::State::Completed && ResponseJob) { CompletedAction.ActionResults = ResponseJob.AsPackage(); } CompletedActions.push_back(std::move(CompletedAction)); m_RemoteRunningMap.erase(CompleteIt); } else { // we received a completion notice for an action we don't know about, // this can happen if the runner is used by multiple upstream schedulers, // or if this compute node was recently restarted and lost track of // previously scheduled actions } }); } if (CbObjectView Metrics = Completed["metrics"sv].AsObjectView()) { // if (const size_t CpuCount = Metrics["core_count"].AsInt32(0)) if (const int32_t CpuCount = Metrics["lp_count"].AsInt32(0)) { const int32_t NewCap = zen::Max(4, CpuCount); if (m_MaxRunningActions > NewCap) { ZEN_DEBUG("capping {} to {} actions (was {})", m_BaseUrl, NewCap, m_MaxRunningActions); m_MaxRunningActions = NewCap; } } } } // Notify outer. Note that this has to be done without holding any local locks // otherwise we may end up with deadlocks. for (HttpRunningAction& HttpAction : CompletedActions) { const int ActionLsn = HttpAction.Action->ActionLsn; ZEN_DEBUG("action {} LSN {} (remote LSN {}) -> {}", HttpAction.Action->ActionId, ActionLsn, HttpAction.RemoteActionLsn, RunnerAction::ToString(HttpAction.RemoteState)); if (HttpAction.RemoteState == RunnerAction::State::Completed) { HttpAction.Action->SetResult(std::move(HttpAction.ActionResults)); } HttpAction.Action->SetActionState(HttpAction.RemoteState); } return CompletedActions.size(); } } // namespace zen::compute #endif