aboutsummaryrefslogtreecommitdiff
path: root/zenserver/compute/function.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'zenserver/compute/function.cpp')
-rw-r--r--zenserver/compute/function.cpp149
1 files changed, 147 insertions, 2 deletions
diff --git a/zenserver/compute/function.cpp b/zenserver/compute/function.cpp
index 9af3efcec..996573573 100644
--- a/zenserver/compute/function.cpp
+++ b/zenserver/compute/function.cpp
@@ -242,6 +242,65 @@ HttpFunctionService::HttpFunctionService(CasStore& Store,
HttpVerb::kGet);
m_Router.RegisterRoute(
+ "simple/{worker}",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1));
+
+ WorkerDesc Worker;
+
+ {
+ RwLock::SharedLockScope _(m_WorkerLock);
+
+ if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+ else
+ {
+ Worker = It->second;
+ }
+ }
+
+ switch (HttpReq.RequestVerb())
+ {
+ case HttpVerb::kGet:
+ {
+ CbObject Output;
+ HttpResponseCode ResponseCode = ExecActionUpstreamResult(WorkerId, Output);
+ if (ResponseCode != HttpResponseCode::OK)
+ {
+ return HttpReq.WriteResponse(ResponseCode);
+ }
+
+ {
+ RwLock::SharedLockScope _(m_WorkerLock);
+ m_WorkerMap.erase(WorkerId);
+ }
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
+ }
+ break;
+
+ case HttpVerb::kPost:
+ {
+ CbObject Output;
+ HttpResponseCode ResponseCode = ExecActionUpstream(Worker, Output);
+ if (ResponseCode != HttpResponseCode::OK)
+ {
+ return HttpReq.WriteResponse(ResponseCode);
+ }
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
+ }
+ break;
+
+ default:
+ break;
+ }
+ },
+ HttpVerb::kGet | HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
"jobs/{worker}",
[this](HttpRouterRequest& Req) {
HttpServerRequest& HttpReq = Req.ServerRequest();
@@ -403,6 +462,92 @@ HttpFunctionService::HandleRequest(HttpServerRequest& Request)
}
HttpResponseCode
+HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject& Object)
+{
+ const IoHash WorkerId = Worker.Descriptor.GetHash();
+
+ ZEN_INFO("Action {} being processed...", WorkerId.ToHexString());
+
+ auto EnqueueResult = m_UpstreamApply->EnqueueUpstream({.WorkerDescriptor = Worker.Descriptor, .Type = UpstreamApplyType::Simple});
+ if (!EnqueueResult.Success)
+ {
+ ZEN_ERROR("Error enqueuing upstream Action {}", WorkerId.ToHexString());
+ return HttpResponseCode::InternalServerError;
+ }
+
+ CbObjectWriter Writer;
+ Writer.AddHash("worker", WorkerId);
+
+ Object = Writer.Save();
+ return HttpResponseCode::OK;
+}
+
+HttpResponseCode
+HttpFunctionService::ExecActionUpstreamResult(const IoHash& WorkerId, CbObject& Object)
+{
+ const static IoHash Empty = CbObject().GetHash();
+ auto Status = m_UpstreamApply->GetStatus(WorkerId, Empty);
+ if (!Status.Success)
+ {
+ return HttpResponseCode::NotFound;
+ }
+
+ if (Status.Status.State != UpstreamApplyState::Complete)
+ {
+ return HttpResponseCode::Accepted;
+ }
+
+ GetUpstreamApplyResult& Completed = Status.Status.Result;
+
+ if (!Completed.Success)
+ {
+ ZEN_ERROR("Action {} failed:\n stdout: {}\n stderr: {}\n reason: {}\n errorcode: {}",
+ WorkerId.ToHexString(),
+ Completed.StdOut,
+ Completed.StdErr,
+ Completed.Error.Reason,
+ Completed.Error.ErrorCode);
+
+ if (Completed.Error.ErrorCode == 0)
+ {
+ Completed.Error.ErrorCode = -1;
+ }
+ if (Completed.StdErr.empty() && !Completed.Error.Reason.empty())
+ {
+ Completed.StdErr = Completed.Error.Reason;
+ }
+ }
+ else
+ {
+ ZEN_INFO("Action {} completed with {} files ExitCode={}",
+ WorkerId.ToHexString(),
+ Completed.OutputFiles.size(),
+ Completed.Error.ErrorCode);
+ }
+
+ CbObjectWriter ResultObject;
+
+ ResultObject.AddString("agent"sv, Completed.Agent);
+ ResultObject.AddString("detail"sv, Completed.Detail);
+ ResultObject.AddString("stdout"sv, Completed.StdOut);
+ ResultObject.AddString("stderr"sv, Completed.StdErr);
+ ResultObject.AddInteger("exitcode"sv, Completed.Error.ErrorCode);
+
+ ResultObject.BeginArray("files"sv);
+ for (const auto& File : Completed.OutputFiles)
+ {
+ ResultObject.BeginObject();
+ ResultObject.AddString("name"sv, File.first.string());
+ ResultObject.AddBinary("data"sv, Completed.FileData[File.second]);
+ ResultObject.EndObject();
+ }
+ ResultObject.EndArray();
+
+ Object = ResultObject.Save();
+ return HttpResponseCode::OK;
+}
+
+HttpResponseCode
HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject Action, CbObject& Object)
{
const IoHash WorkerId = Worker.Descriptor.GetHash();
@@ -412,7 +557,8 @@ HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject Actio
ZEN_INFO("Action {}/{} being processed...", WorkerId.ToHexString(), ActionId.ToHexString());
- auto EnqueueResult = m_UpstreamApply->EnqueueUpstream({.WorkerDescriptor = Worker.Descriptor, .Action = std::move(Action)});
+ auto EnqueueResult = m_UpstreamApply->EnqueueUpstream(
+ {.WorkerDescriptor = Worker.Descriptor, .Action = std::move(Action), .Type = UpstreamApplyType::Asset});
if (!EnqueueResult.Success)
{
@@ -434,7 +580,6 @@ HttpFunctionService::ExecActionUpstreamResult(const IoHash& WorkerId, const IoHa
auto Status = m_UpstreamApply->GetStatus(WorkerId, ActionId);
if (!Status.Success)
{
- // throw std::runtime_error(fmt::format("Action {}/{} not found", WorkerId.ToHexString(), ActionId.ToHexString()).c_str());
return HttpResponseCode::NotFound;
}