diff options
| author | Per Larsson <[email protected]> | 2022-04-25 11:22:43 +0200 |
|---|---|---|
| committer | Per Larsson <[email protected]> | 2022-04-25 11:22:43 +0200 |
| commit | 1e7c5d062327ec249d3daae0f9ce61a03c4e76b2 (patch) | |
| tree | 893423c3d0b28f1d08b7ce1b138f4e1b4e45537c /zenserver/compute/function.cpp | |
| parent | Merge branch 'main' into ddcref (diff) | |
| parent | Compute tweaks (#78) (diff) | |
| download | zen-1e7c5d062327ec249d3daae0f9ce61a03c4e76b2.tar.xz zen-1e7c5d062327ec249d3daae0f9ce61a03c4e76b2.zip | |
Merge branch 'main' into ddcref
Diffstat (limited to 'zenserver/compute/function.cpp')
| -rw-r--r-- | zenserver/compute/function.cpp | 168 |
1 files changed, 166 insertions, 2 deletions
diff --git a/zenserver/compute/function.cpp b/zenserver/compute/function.cpp index 9af3efcec..dd31013ef 100644 --- a/zenserver/compute/function.cpp +++ b/zenserver/compute/function.cpp @@ -54,6 +54,16 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, m_Router.AddPattern("action", "([[:xdigit:]]{40})"); m_Router.RegisterRoute( + "ready", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + // Todo: check upstream health + return HttpReq.WriteResponse(HttpResponseCode::OK); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( "workers/{worker}", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); @@ -242,6 +252,65 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, HttpVerb::kGet); m_Router.RegisterRoute( + "simple/{worker}", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); + + WorkerDesc Worker; + + { + RwLock::SharedLockScope _(m_WorkerLock); + + if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end()) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + else + { + Worker = It->second; + } + } + + switch (HttpReq.RequestVerb()) + { + case HttpVerb::kGet: + { + CbObject Output; + HttpResponseCode ResponseCode = ExecActionUpstreamResult(WorkerId, Output); + if (ResponseCode != HttpResponseCode::OK) + { + return HttpReq.WriteResponse(ResponseCode); + } + + { + RwLock::SharedLockScope _(m_WorkerLock); + m_WorkerMap.erase(WorkerId); + } + + return HttpReq.WriteResponse(HttpResponseCode::OK, Output); + } + break; + + case HttpVerb::kPost: + { + CbObject Output; + HttpResponseCode ResponseCode = ExecActionUpstream(Worker, Output); + if (ResponseCode != HttpResponseCode::OK) + { + return HttpReq.WriteResponse(ResponseCode); + } + return HttpReq.WriteResponse(HttpResponseCode::OK, Output); + } + break; + + default: + break; + } + }, + HttpVerb::kGet | HttpVerb::kPost); + + m_Router.RegisterRoute( "jobs/{worker}", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); @@ -403,6 +472,101 @@ HttpFunctionService::HandleRequest(HttpServerRequest& Request) } HttpResponseCode +HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject& Object) +{ + const IoHash WorkerId = Worker.Descriptor.GetHash(); + + ZEN_INFO("Action {} being processed...", WorkerId.ToHexString()); + + auto EnqueueResult = m_UpstreamApply->EnqueueUpstream({.WorkerDescriptor = Worker.Descriptor, .Type = UpstreamApplyType::Simple}); + if (!EnqueueResult.Success) + { + ZEN_ERROR("Error enqueuing upstream Action {}", WorkerId.ToHexString()); + return HttpResponseCode::InternalServerError; + } + + CbObjectWriter Writer; + Writer.AddHash("worker", WorkerId); + + Object = Writer.Save(); + return HttpResponseCode::OK; +} + +HttpResponseCode +HttpFunctionService::ExecActionUpstreamResult(const IoHash& WorkerId, CbObject& Object) +{ + const static IoHash Empty = CbObject().GetHash(); + auto Status = m_UpstreamApply->GetStatus(WorkerId, Empty); + if (!Status.Success) + { + return HttpResponseCode::NotFound; + } + + if (Status.Status.State != UpstreamApplyState::Complete) + { + return HttpResponseCode::Accepted; + } + + GetUpstreamApplyResult& Completed = Status.Status.Result; + + if (!Completed.Success) + { + ZEN_ERROR("Action {} failed:\n stdout: {}\n stderr: {}\n reason: {}\n errorcode: {}", + WorkerId.ToHexString(), + Completed.StdOut, + Completed.StdErr, + Completed.Error.Reason, + Completed.Error.ErrorCode); + + if (Completed.Error.ErrorCode == 0) + { + Completed.Error.ErrorCode = -1; + } + if (Completed.StdErr.empty() && !Completed.Error.Reason.empty()) + { + Completed.StdErr = Completed.Error.Reason; + } + } + else + { + ZEN_INFO("Action {} completed with {} files ExitCode={}", + WorkerId.ToHexString(), + Completed.OutputFiles.size(), + Completed.Error.ErrorCode); + } + + CbObjectWriter ResultObject; + + ResultObject.AddString("agent"sv, Completed.Agent); + ResultObject.AddString("detail"sv, Completed.Detail); + ResultObject.AddString("stdout"sv, Completed.StdOut); + ResultObject.AddString("stderr"sv, Completed.StdErr); + ResultObject.AddInteger("exitcode"sv, Completed.Error.ErrorCode); + ResultObject.BeginArray("stats"sv); + for (const auto& Timepoint : Completed.Timepoints) + { + ResultObject.BeginObject(); + ResultObject.AddString("name"sv, Timepoint.first); + ResultObject.AddDateTimeTicks("time"sv, Timepoint.second); + ResultObject.EndObject(); + } + ResultObject.EndArray(); + + ResultObject.BeginArray("files"sv); + for (const auto& File : Completed.OutputFiles) + { + ResultObject.BeginObject(); + ResultObject.AddString("name"sv, File.first.string()); + ResultObject.AddBinary("data"sv, Completed.FileData[File.second]); + ResultObject.EndObject(); + } + ResultObject.EndArray(); + + Object = ResultObject.Save(); + return HttpResponseCode::OK; +} + +HttpResponseCode HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject Action, CbObject& Object) { const IoHash WorkerId = Worker.Descriptor.GetHash(); @@ -412,7 +576,8 @@ HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject Actio ZEN_INFO("Action {}/{} being processed...", WorkerId.ToHexString(), ActionId.ToHexString()); - auto EnqueueResult = m_UpstreamApply->EnqueueUpstream({.WorkerDescriptor = Worker.Descriptor, .Action = std::move(Action)}); + auto EnqueueResult = m_UpstreamApply->EnqueueUpstream( + {.WorkerDescriptor = Worker.Descriptor, .Action = std::move(Action), .Type = UpstreamApplyType::Asset}); if (!EnqueueResult.Success) { @@ -434,7 +599,6 @@ HttpFunctionService::ExecActionUpstreamResult(const IoHash& WorkerId, const IoHa auto Status = m_UpstreamApply->GetStatus(WorkerId, ActionId); if (!Status.Success) { - // throw std::runtime_error(fmt::format("Action {}/{} not found", WorkerId.ToHexString(), ActionId.ToHexString()).c_str()); return HttpResponseCode::NotFound; } |