diff options
| author | Joe Kirchoff <[email protected]> | 2022-03-30 14:15:15 -0700 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-03-30 14:15:15 -0700 |
| commit | eb33c52b8e338b1bccf0d9d26b56d7ef611f6059 (patch) | |
| tree | 994b2af87e7b0cfba3250d41227c94d777738dc4 /zenserver/compute/function.cpp | |
| parent | Retain flags for small objects in structured cache (#68) (diff) | |
| download | zen-eb33c52b8e338b1bccf0d9d26b56d7ef611f6059.tar.xz zen-eb33c52b8e338b1bccf0d9d26b56d7ef611f6059.zip | |
Simple file-based compute (#65)
Diffstat (limited to 'zenserver/compute/function.cpp')
| -rw-r--r-- | zenserver/compute/function.cpp | 149 |
1 files changed, 147 insertions, 2 deletions
diff --git a/zenserver/compute/function.cpp b/zenserver/compute/function.cpp index 9af3efcec..996573573 100644 --- a/zenserver/compute/function.cpp +++ b/zenserver/compute/function.cpp @@ -242,6 +242,65 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, HttpVerb::kGet); m_Router.RegisterRoute( + "simple/{worker}", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); + + WorkerDesc Worker; + + { + RwLock::SharedLockScope _(m_WorkerLock); + + if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end()) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + else + { + Worker = It->second; + } + } + + switch (HttpReq.RequestVerb()) + { + case HttpVerb::kGet: + { + CbObject Output; + HttpResponseCode ResponseCode = ExecActionUpstreamResult(WorkerId, Output); + if (ResponseCode != HttpResponseCode::OK) + { + return HttpReq.WriteResponse(ResponseCode); + } + + { + RwLock::SharedLockScope _(m_WorkerLock); + m_WorkerMap.erase(WorkerId); + } + + return HttpReq.WriteResponse(HttpResponseCode::OK, Output); + } + break; + + case HttpVerb::kPost: + { + CbObject Output; + HttpResponseCode ResponseCode = ExecActionUpstream(Worker, Output); + if (ResponseCode != HttpResponseCode::OK) + { + return HttpReq.WriteResponse(ResponseCode); + } + return HttpReq.WriteResponse(HttpResponseCode::OK, Output); + } + break; + + default: + break; + } + }, + HttpVerb::kGet | HttpVerb::kPost); + + m_Router.RegisterRoute( "jobs/{worker}", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); @@ -403,6 +462,92 @@ HttpFunctionService::HandleRequest(HttpServerRequest& Request) } HttpResponseCode +HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject& Object) +{ + const IoHash WorkerId = Worker.Descriptor.GetHash(); + + ZEN_INFO("Action {} being processed...", WorkerId.ToHexString()); + + auto EnqueueResult = m_UpstreamApply->EnqueueUpstream({.WorkerDescriptor = Worker.Descriptor, .Type = UpstreamApplyType::Simple}); + if (!EnqueueResult.Success) + { + ZEN_ERROR("Error enqueuing upstream Action {}", WorkerId.ToHexString()); + return HttpResponseCode::InternalServerError; + } + + CbObjectWriter Writer; + Writer.AddHash("worker", WorkerId); + + Object = Writer.Save(); + return HttpResponseCode::OK; +} + +HttpResponseCode +HttpFunctionService::ExecActionUpstreamResult(const IoHash& WorkerId, CbObject& Object) +{ + const static IoHash Empty = CbObject().GetHash(); + auto Status = m_UpstreamApply->GetStatus(WorkerId, Empty); + if (!Status.Success) + { + return HttpResponseCode::NotFound; + } + + if (Status.Status.State != UpstreamApplyState::Complete) + { + return HttpResponseCode::Accepted; + } + + GetUpstreamApplyResult& Completed = Status.Status.Result; + + if (!Completed.Success) + { + ZEN_ERROR("Action {} failed:\n stdout: {}\n stderr: {}\n reason: {}\n errorcode: {}", + WorkerId.ToHexString(), + Completed.StdOut, + Completed.StdErr, + Completed.Error.Reason, + Completed.Error.ErrorCode); + + if (Completed.Error.ErrorCode == 0) + { + Completed.Error.ErrorCode = -1; + } + if (Completed.StdErr.empty() && !Completed.Error.Reason.empty()) + { + Completed.StdErr = Completed.Error.Reason; + } + } + else + { + ZEN_INFO("Action {} completed with {} files ExitCode={}", + WorkerId.ToHexString(), + Completed.OutputFiles.size(), + Completed.Error.ErrorCode); + } + + CbObjectWriter ResultObject; + + ResultObject.AddString("agent"sv, Completed.Agent); + ResultObject.AddString("detail"sv, Completed.Detail); + ResultObject.AddString("stdout"sv, Completed.StdOut); + ResultObject.AddString("stderr"sv, Completed.StdErr); + ResultObject.AddInteger("exitcode"sv, Completed.Error.ErrorCode); + + ResultObject.BeginArray("files"sv); + for (const auto& File : Completed.OutputFiles) + { + ResultObject.BeginObject(); + ResultObject.AddString("name"sv, File.first.string()); + ResultObject.AddBinary("data"sv, Completed.FileData[File.second]); + ResultObject.EndObject(); + } + ResultObject.EndArray(); + + Object = ResultObject.Save(); + return HttpResponseCode::OK; +} + +HttpResponseCode HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject Action, CbObject& Object) { const IoHash WorkerId = Worker.Descriptor.GetHash(); @@ -412,7 +557,8 @@ HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject Actio ZEN_INFO("Action {}/{} being processed...", WorkerId.ToHexString(), ActionId.ToHexString()); - auto EnqueueResult = m_UpstreamApply->EnqueueUpstream({.WorkerDescriptor = Worker.Descriptor, .Action = std::move(Action)}); + auto EnqueueResult = m_UpstreamApply->EnqueueUpstream( + {.WorkerDescriptor = Worker.Descriptor, .Action = std::move(Action), .Type = UpstreamApplyType::Asset}); if (!EnqueueResult.Success) { @@ -434,7 +580,6 @@ HttpFunctionService::ExecActionUpstreamResult(const IoHash& WorkerId, const IoHa auto Status = m_UpstreamApply->GetStatus(WorkerId, ActionId); if (!Status.Success) { - // throw std::runtime_error(fmt::format("Action {}/{} not found", WorkerId.ToHexString(), ActionId.ToHexString()).c_str()); return HttpResponseCode::NotFound; } |