// Copyright Epic Games, Inc. All Rights Reserved. #include "function.h" #if ZEN_WITH_COMPUTE_SERVICES # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include using namespace std::literals; namespace zen { HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, const CloudCacheClientOptions& ComputeOptions, const CloudCacheClientOptions& StorageOptions, const UpstreamAuthConfig& ComputeAuthConfig, const UpstreamAuthConfig& StorageAuthConfig, AuthMgr& Mgr) : m_Log(logging::Get("apply")) , m_CasStore(Store) , m_CidStore(InCidStore) { m_UpstreamApply = UpstreamApply::Create({}, m_CasStore, m_CidStore); auto HordeUpstreamEndpoint = UpstreamApplyEndpoint::CreateHordeEndpoint(ComputeOptions, ComputeAuthConfig, StorageOptions, StorageAuthConfig, m_CasStore, m_CidStore, Mgr); m_UpstreamApply->RegisterEndpoint(std::move(HordeUpstreamEndpoint)); m_UpstreamApply->Initialize(); m_Router.AddPattern("job", "([[:digit:]]+)"); m_Router.AddPattern("worker", "([[:xdigit:]]{40})"); m_Router.AddPattern("action", "([[:xdigit:]]{40})"); m_Router.RegisterRoute( "ready", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); // Todo: check upstream health return HttpReq.WriteResponse(HttpResponseCode::OK); }, HttpVerb::kGet); m_Router.RegisterRoute( "workers/{worker}", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); switch (HttpReq.RequestVerb()) { case HttpVerb::kGet: { RwLock::SharedLockScope _(m_WorkerLock); if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end()) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } else { const WorkerDesc& Desc = It->second; return HttpReq.WriteResponse(HttpResponseCode::OK, Desc.Descriptor); } } break; case HttpVerb::kPost: { switch (HttpReq.RequestContentType()) { case HttpContentType::kCbObject: { CbObject FunctionSpec = HttpReq.ReadPayloadObject(); // Determine which pieces are missing and need to be transmitted to populate CAS CasChunkSet ChunkSet; FunctionSpec.IterateAttachments([&](CbFieldView Field) { const IoHash Hash = Field.AsHash(); ChunkSet.AddChunkToSet(Hash); }); // Note that we store executables uncompressed to make it // more straightforward and efficient to materialize them, hence // the CAS lookup here instead of CID for the input payloads m_CasStore.FilterChunks(ChunkSet); if (ChunkSet.IsEmpty()) { RwLock::ExclusiveLockScope _(m_WorkerLock); m_WorkerMap.insert_or_assign(WorkerId, WorkerDesc{FunctionSpec}); ZEN_DEBUG("worker {}: all attachments already available", WorkerId); return HttpReq.WriteResponse(HttpResponseCode::NoContent); } else { CbObjectWriter ResponseWriter; ResponseWriter.BeginArray("need"); ChunkSet.IterateChunks([&](const IoHash& Hash) { ZEN_DEBUG("worker {}: need chunk {}", WorkerId, Hash); ResponseWriter.AddHash(Hash); }); ResponseWriter.EndArray(); ZEN_DEBUG("worker {}: need {} attachments", WorkerId, ChunkSet.GetSize()); return HttpReq.WriteResponse(HttpResponseCode::NotFound, ResponseWriter.Save()); } } break; case HttpContentType::kCbPackage: { CbPackage FunctionSpec = HttpReq.ReadPayloadPackage(); CbObject Obj = FunctionSpec.GetObject(); std::span Attachments = FunctionSpec.GetAttachments(); int AttachmentCount = 0; int NewAttachmentCount = 0; uint64_t TotalAttachmentBytes = 0; uint64_t TotalNewBytes = 0; for (const CbAttachment& Attachment : Attachments) { ZEN_ASSERT(Attachment.IsCompressedBinary()); const IoHash DataHash = Attachment.GetHash(); CompressedBuffer DataView = Attachment.AsCompressedBinary(); SharedBuffer Decompressed = DataView.Decompress(); const uint64_t DecompressedSize = DataView.GetRawSize(); ZEN_UNUSED(DataHash); TotalAttachmentBytes += DecompressedSize; ++AttachmentCount; // Note that we store executables uncompressed to make it // more straightforward and efficient to materialize them const CasStore::InsertResult InsertResult = m_CasStore.InsertChunk(Decompressed.AsIoBuffer(), IoHash::FromBLAKE3(DataView.GetRawHash())); if (InsertResult.New) { TotalNewBytes += DecompressedSize; ++NewAttachmentCount; } } ZEN_DEBUG("worker {}: {} in {} attachments, {} in {} new attachments", WorkerId, zen::NiceBytes(TotalAttachmentBytes), AttachmentCount, zen::NiceBytes(TotalNewBytes), NewAttachmentCount); RwLock::ExclusiveLockScope _(m_WorkerLock); m_WorkerMap.insert_or_assign(WorkerId, WorkerDesc{.Descriptor = Obj}); return HttpReq.WriteResponse(HttpResponseCode::NoContent); } break; default: break; } } break; default: break; } }, HttpVerb::kGet | HttpVerb::kPost); m_Router.RegisterRoute( "jobs/{job}", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); switch (HttpReq.RequestVerb()) { case HttpVerb::kGet: break; case HttpVerb::kPost: break; default: break; } }, HttpVerb::kGet | HttpVerb::kPost); m_Router.RegisterRoute( "jobs/{worker}/{action}", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); const IoHash ActionId = IoHash::FromHexString(Req.GetCapture(2)); switch (HttpReq.RequestVerb()) { case HttpVerb::kGet: { CbPackage Output; HttpResponseCode ResponseCode = ExecActionUpstreamResult(WorkerId, ActionId, Output); if (ResponseCode != HttpResponseCode::OK) { return HttpReq.WriteResponse(ResponseCode); } return HttpReq.WriteResponse(HttpResponseCode::OK, Output); } break; } }, HttpVerb::kGet); m_Router.RegisterRoute( "simple/{worker}", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); WorkerDesc Worker; { RwLock::SharedLockScope _(m_WorkerLock); if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end()) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } else { Worker = It->second; } } switch (HttpReq.RequestVerb()) { case HttpVerb::kGet: { CbObject Output; HttpResponseCode ResponseCode = ExecActionUpstreamResult(WorkerId, Output); if (ResponseCode != HttpResponseCode::OK) { return HttpReq.WriteResponse(ResponseCode); } { RwLock::SharedLockScope _(m_WorkerLock); m_WorkerMap.erase(WorkerId); } return HttpReq.WriteResponse(HttpResponseCode::OK, Output); } break; case HttpVerb::kPost: { CbObject Output; HttpResponseCode ResponseCode = ExecActionUpstream(Worker, Output); if (ResponseCode != HttpResponseCode::OK) { return HttpReq.WriteResponse(ResponseCode); } return HttpReq.WriteResponse(HttpResponseCode::OK, Output); } break; default: break; } }, HttpVerb::kGet | HttpVerb::kPost); m_Router.RegisterRoute( "jobs/{worker}", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); WorkerDesc Worker; { RwLock::SharedLockScope _(m_WorkerLock); if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end()) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } else { Worker = It->second; } } switch (HttpReq.RequestVerb()) { case HttpVerb::kGet: // TODO: return status of all pending or executing jobs break; case HttpVerb::kPost: switch (HttpReq.RequestContentType()) { case HttpContentType::kCbObject: { // This operation takes the proposed job spec and identifies which // chunks are not present on this server. This list is then returned in // the "need" list in the response IoBuffer Payload = HttpReq.ReadPayload(); CbObject RequestObject = LoadCompactBinaryObject(Payload); std::vector NeedList; RequestObject.IterateAttachments([&](CbFieldView Field) { const IoHash FileHash = Field.AsHash(); if (!m_CidStore.ContainsChunk(FileHash)) { NeedList.push_back(FileHash); } }); if (NeedList.empty()) { // We already have everything CbObject Output; HttpResponseCode ResponseCode = ExecActionUpstream(Worker, RequestObject, Output); if (ResponseCode != HttpResponseCode::OK) { return HttpReq.WriteResponse(ResponseCode); } return HttpReq.WriteResponse(HttpResponseCode::OK, Output); } CbObjectWriter Cbo; Cbo.BeginArray("need"); for (const IoHash& Hash : NeedList) { Cbo << Hash; } Cbo.EndArray(); CbObject Response = Cbo.Save(); return HttpReq.WriteResponse(HttpResponseCode::NotFound, Response); } break; case HttpContentType::kCbPackage: { CbPackage Action = HttpReq.ReadPayloadPackage(); CbObject ActionObj = Action.GetObject(); std::span Attachments = Action.GetAttachments(); int AttachmentCount = 0; int NewAttachmentCount = 0; uint64_t TotalAttachmentBytes = 0; uint64_t TotalNewBytes = 0; for (const CbAttachment& Attachment : Attachments) { ZEN_ASSERT(Attachment.IsCompressedBinary()); const IoHash DataHash = Attachment.GetHash(); CompressedBuffer DataView = Attachment.AsCompressedBinary(); ZEN_UNUSED(DataHash); const uint64_t CompressedSize = DataView.GetCompressedSize(); TotalAttachmentBytes += CompressedSize; ++AttachmentCount; const CidStore::InsertResult InsertResult = m_CidStore.AddChunk(DataView); if (InsertResult.New) { TotalNewBytes += CompressedSize; ++NewAttachmentCount; } } ZEN_DEBUG("new action: {} in {} attachments. {} new ({} attachments)", zen::NiceBytes(TotalAttachmentBytes), AttachmentCount, zen::NiceBytes(TotalNewBytes), NewAttachmentCount); CbObject Output; HttpResponseCode ResponseCode = ExecActionUpstream(Worker, ActionObj, Output); if (ResponseCode != HttpResponseCode::OK) { return HttpReq.WriteResponse(ResponseCode); } return HttpReq.WriteResponse(HttpResponseCode::OK, Output); } break; default: break; } break; default: break; } }, HttpVerb::kPost); } HttpFunctionService::~HttpFunctionService() { } const char* HttpFunctionService::BaseUri() const { return "/apply/"; } void HttpFunctionService::HandleRequest(HttpServerRequest& Request) { if (m_Router.HandleRequest(Request) == false) { ZEN_WARN("No route found for {0}", Request.RelativeUri()); } } HttpResponseCode HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject& Object) { const IoHash WorkerId = Worker.Descriptor.GetHash(); ZEN_INFO("Action {} being processed...", WorkerId.ToHexString()); auto EnqueueResult = m_UpstreamApply->EnqueueUpstream({.WorkerDescriptor = Worker.Descriptor, .Type = UpstreamApplyType::Simple}); if (!EnqueueResult.Success) { ZEN_ERROR("Error enqueuing upstream Action {}", WorkerId.ToHexString()); return HttpResponseCode::InternalServerError; } CbObjectWriter Writer; Writer.AddHash("worker", WorkerId); Object = Writer.Save(); return HttpResponseCode::OK; } HttpResponseCode HttpFunctionService::ExecActionUpstreamResult(const IoHash& WorkerId, CbObject& Object) { const static IoHash Empty = CbObject().GetHash(); auto Status = m_UpstreamApply->GetStatus(WorkerId, Empty); if (!Status.Success) { return HttpResponseCode::NotFound; } if (Status.Status.State != UpstreamApplyState::Complete) { return HttpResponseCode::Accepted; } GetUpstreamApplyResult& Completed = Status.Status.Result; if (!Completed.Success) { ZEN_ERROR("Action {} failed:\n stdout: {}\n stderr: {}\n reason: {}\n errorcode: {}", WorkerId.ToHexString(), Completed.StdOut, Completed.StdErr, Completed.Error.Reason, Completed.Error.ErrorCode); if (Completed.Error.ErrorCode == 0) { Completed.Error.ErrorCode = -1; } if (Completed.StdErr.empty() && !Completed.Error.Reason.empty()) { Completed.StdErr = Completed.Error.Reason; } } else { ZEN_INFO("Action {} completed with {} files ExitCode={}", WorkerId.ToHexString(), Completed.OutputFiles.size(), Completed.Error.ErrorCode); } CbObjectWriter ResultObject; ResultObject.AddString("agent"sv, Completed.Agent); ResultObject.AddString("detail"sv, Completed.Detail); ResultObject.AddString("stdout"sv, Completed.StdOut); ResultObject.AddString("stderr"sv, Completed.StdErr); ResultObject.AddInteger("exitcode"sv, Completed.Error.ErrorCode); ResultObject.BeginArray("stats"sv); for (const auto& Timepoint : Completed.Timepoints) { ResultObject.BeginObject(); ResultObject.AddString("name"sv, Timepoint.first); ResultObject.AddDateTimeTicks("time"sv, Timepoint.second); ResultObject.EndObject(); } ResultObject.EndArray(); ResultObject.BeginArray("files"sv); for (const auto& File : Completed.OutputFiles) { ResultObject.BeginObject(); ResultObject.AddString("name"sv, File.first.string()); ResultObject.AddBinary("data"sv, Completed.FileData[File.second]); ResultObject.EndObject(); } ResultObject.EndArray(); Object = ResultObject.Save(); return HttpResponseCode::OK; } HttpResponseCode HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject Action, CbObject& Object) { const IoHash WorkerId = Worker.Descriptor.GetHash(); const IoHash ActionId = Action.GetHash(); Action.MakeOwned(); ZEN_INFO("Action {}/{} being processed...", WorkerId.ToHexString(), ActionId.ToHexString()); auto EnqueueResult = m_UpstreamApply->EnqueueUpstream( {.WorkerDescriptor = Worker.Descriptor, .Action = std::move(Action), .Type = UpstreamApplyType::Asset}); if (!EnqueueResult.Success) { ZEN_ERROR("Error enqueuing upstream Action {}/{}", WorkerId.ToHexString(), ActionId.ToHexString()); return HttpResponseCode::InternalServerError; } CbObjectWriter Writer; Writer.AddHash("worker", WorkerId); Writer.AddHash("action", ActionId); Object = Writer.Save(); return HttpResponseCode::OK; } HttpResponseCode HttpFunctionService::ExecActionUpstreamResult(const IoHash& WorkerId, const IoHash& ActionId, CbPackage& Package) { auto Status = m_UpstreamApply->GetStatus(WorkerId, ActionId); if (!Status.Success) { return HttpResponseCode::NotFound; } if (Status.Status.State != UpstreamApplyState::Complete) { return HttpResponseCode::Accepted; } GetUpstreamApplyResult& Completed = Status.Status.Result; if (!Completed.Success || Completed.Error.ErrorCode != 0) { ZEN_ERROR("Action {}/{} failed:\n stdout: {}\n stderr: {}\n reason: {}\n errorcode: {}", WorkerId.ToHexString(), ActionId.ToHexString(), Completed.StdOut, Completed.StdErr, Completed.Error.Reason, Completed.Error.ErrorCode); return HttpResponseCode::InternalServerError; } ZEN_INFO("Action {}/{} completed with {} attachments ({} compressed, {} uncompressed)", WorkerId.ToHexString(), ActionId.ToHexString(), Completed.OutputPackage.GetAttachments().size(), NiceBytes(Completed.TotalAttachmentBytes), NiceBytes(Completed.TotalRawAttachmentBytes)); Package = std::move(Completed.OutputPackage); return HttpResponseCode::OK; } } // namespace zen #endif // ZEN_WITH_COMPUTE_SERVICES