// Copyright Epic Games, Inc. All Rights Reserved. #include "apply.h" #if ZEN_WITH_COMPUTE_SERVICES # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include ZEN_THIRD_PARTY_INCLUDES_START # include # include # include # include # pragma comment(lib, "UserEnv.lib") # include ZEN_THIRD_PARTY_INCLUDES_END # include # include using namespace std::literals; namespace zen { struct BasicFunctionJob { public: BasicFunctionJob() = default; ~BasicFunctionJob(); void SetWorkingDirectory(const std::filesystem::path& WorkingDirectory) { m_WorkingDirectory = WorkingDirectory; } bool SpawnJob(std::filesystem::path ExePath, std::wstring CommandLine); bool Wait(uint32_t TimeoutMs = ~0); int ExitCode(); private: std::filesystem::path m_WorkingDirectory; int m_ProcessId = 0; CHandle m_ProcessHandle; }; BasicFunctionJob::~BasicFunctionJob() { Wait(); } bool BasicFunctionJob::SpawnJob(std::filesystem::path ExePath, std::wstring CommandLine) { STARTUPINFOEX StartupInfo = {sizeof(STARTUPINFOEX)}; PROCESS_INFORMATION ProcessInfo{}; std::wstring ExePathNative = ExePath.native(); std::wstring WorkingDirNative = m_WorkingDirectory.native(); BOOL Created = ::CreateProcess(ExePathNative.data() /* ApplicationName */, CommandLine.data() /* Command Line */, nullptr /* Process Attributes */, nullptr /* Security Attributes */, FALSE /* InheritHandles */, 0 /* Flags */, nullptr /* Environment */, WorkingDirNative.data() /* Current Directory */, (LPSTARTUPINFO)&StartupInfo, &ProcessInfo); if (!Created) { throw std::system_error(::GetLastError(), std::system_category(), fmt::format("Failed to create process '{}'", ExePath).c_str()); } m_ProcessId = ProcessInfo.dwProcessId; m_ProcessHandle.Attach(ProcessInfo.hProcess); ::CloseHandle(ProcessInfo.hThread); ZEN_INFO("Created process {}", m_ProcessId); return true; } bool BasicFunctionJob::Wait(uint32_t TimeoutMs) { if (!m_ProcessHandle) { return true; } DWORD WaitResult = WaitForSingleObject(m_ProcessHandle, TimeoutMs); if (WaitResult == WAIT_TIMEOUT) { return false; } if (WaitResult == WAIT_OBJECT_0) { return true; } throw std::runtime_error("Failed wait on process handle"); } int BasicFunctionJob::ExitCode() { DWORD Ec = 0; BOOL Success = GetExitCodeProcess(m_ProcessHandle, &Ec); if (!Success) { ZEN_WARN("failed getting exit code"); } if (Ec == STILL_ACTIVE) { ZEN_WARN("getting exit code but process is STILL_ACTIVE"); } return gsl::narrow_cast(Ec); } //////////////////////////////////////////////////////////////////////////////// struct SandboxedFunctionJob { SandboxedFunctionJob() = default; ~SandboxedFunctionJob() = default; void SetWorkingDirectory(const std::filesystem::path& WorkingDirectory) { m_WorkingDirectory = WorkingDirectory; } void Initialize(std::string_view AppContainerId); bool SpawnJob(std::filesystem::path ExePath); void AddWhitelistFile(const std::filesystem::path& FilePath) { m_WhitelistFiles.push_back(FilePath); } private: bool GrantNamedObjectAccess(PWSTR Name, SE_OBJECT_TYPE Type, ACCESS_MASK AccessMask, bool Recursive); std::filesystem::path m_WorkingDirectory; std::vector m_WhitelistFiles; std::vector m_WhitelistRegistryKeys; PSID m_AppContainerSid = nullptr; bool m_IsInitialized = false; }; bool SandboxedFunctionJob::GrantNamedObjectAccess(PWSTR ObjectName, SE_OBJECT_TYPE ObjectType, ACCESS_MASK AccessMask, bool Recursive) { DWORD Status; PACL NewAcl = nullptr; DWORD grfInhericance = 0; if (Recursive) { grfInhericance = OBJECT_INHERIT_ACE | CONTAINER_INHERIT_ACE; } EXPLICIT_ACCESS Access{.grfAccessPermissions = AccessMask, .grfAccessMode = GRANT_ACCESS, .grfInheritance = grfInhericance, .Trustee = {.pMultipleTrustee = nullptr, .MultipleTrusteeOperation = NO_MULTIPLE_TRUSTEE, .TrusteeForm = TRUSTEE_IS_SID, .TrusteeType = TRUSTEE_IS_GROUP, .ptstrName = (PWSTR)m_AppContainerSid}}; PACL OldAcl = nullptr; Status = GetNamedSecurityInfo(ObjectName /* ObjectName */, ObjectType /* ObjectType */, DACL_SECURITY_INFORMATION /* SecurityInfo */, nullptr /* ppsidOwner */, nullptr /* ppsidGroup */, &OldAcl /* ppDacl */, nullptr /* ppSacl */, nullptr /* ppSecurityDescriptor */); if (Status != ERROR_SUCCESS) return false; Status = SetEntriesInAcl(1 /* CountOfExplicitEntries */, &Access /* pListOfExplicitEntries */, OldAcl, &NewAcl); if (Status != ERROR_SUCCESS) return false; Status = SetNamedSecurityInfo(ObjectName /* ObjectName */, ObjectType /* ObjectType */, DACL_SECURITY_INFORMATION /*SecurityInfo */, nullptr /* psidOwner */, nullptr /* psidGroup */, NewAcl /* pDacl */, nullptr /* pSacl */); if (NewAcl) ::LocalFree(NewAcl); return Status == ERROR_SUCCESS; } void SandboxedFunctionJob::Initialize(std::string_view AppContainerId) { if (m_IsInitialized) { return; } std::wstring ContainerName = zen::Utf8ToWide(AppContainerId); HRESULT hRes = ::CreateAppContainerProfile(ContainerName.c_str(), ContainerName.c_str() /* Display Name */, ContainerName.c_str() /* Description */, nullptr /* Capabilities */, 0 /* Capability Count */, &m_AppContainerSid); if (FAILED(hRes)) { hRes = ::DeriveAppContainerSidFromAppContainerName(ContainerName.c_str(), &m_AppContainerSid); if (FAILED(hRes)) { ZEN_ERROR("Failed creating app container SID"); } } // Debugging context PWSTR Str = nullptr; ::ConvertSidToStringSid(m_AppContainerSid, &Str); ZEN_INFO("AppContainer SID : '{}'", WideToUtf8(Str)); PWSTR Path = nullptr; if (SUCCEEDED(::GetAppContainerFolderPath(Str, &Path))) { ZEN_INFO("AppContainer folder: '{}'", WideToUtf8(Path)); ::CoTaskMemFree(Path); } ::LocalFree(Str); m_IsInitialized = true; } bool SandboxedFunctionJob::SpawnJob(std::filesystem::path ExePath) { // Build process attributes SECURITY_CAPABILITIES Sc = {0}; Sc.AppContainerSid = m_AppContainerSid; STARTUPINFOEX StartupInfo = {sizeof(STARTUPINFOEX)}; PROCESS_INFORMATION ProcessInfo{}; SIZE_T Size = 0; ::InitializeProcThreadAttributeList(nullptr, 1, 0, &Size); auto AttrBuffer = std::make_unique(Size); StartupInfo.lpAttributeList = reinterpret_cast(AttrBuffer.get()); if (!::InitializeProcThreadAttributeList(StartupInfo.lpAttributeList, 1, 0, &Size)) { return false; } if (!::UpdateProcThreadAttribute(StartupInfo.lpAttributeList, 0, PROC_THREAD_ATTRIBUTE_SECURITY_CAPABILITIES, &Sc, sizeof Sc, nullptr, nullptr)) { return false; } // Set up security for files/folders/registry for (const std::filesystem::path& File : m_WhitelistFiles) { std::wstring NativeFileName = File.native(); GrantNamedObjectAccess(NativeFileName.data(), SE_FILE_OBJECT, FILE_ALL_ACCESS, true); } for (std::wstring& RegKey : m_WhitelistRegistryKeys) { GrantNamedObjectAccess(RegKey.data(), SE_REGISTRY_WOW64_32KEY, KEY_ALL_ACCESS, true); } std::wstring ExePathNative = ExePath.native(); std::wstring WorkingDirNative = m_WorkingDirectory.native(); BOOL Created = ::CreateProcess(nullptr /* ApplicationName */, ExePathNative.data() /* Command line */, nullptr /* Process Attributes */, nullptr /* Security Attributes */, FALSE /* InheritHandles */, EXTENDED_STARTUPINFO_PRESENT | CREATE_NEW_CONSOLE /* Flags */, nullptr /* Environment */, WorkingDirNative.data() /* Current Directory */, (LPSTARTUPINFO)&StartupInfo, &ProcessInfo); DeleteProcThreadAttributeList(StartupInfo.lpAttributeList); if (!Created) { return false; } ZEN_INFO("Created process {}", ProcessInfo.dwProcessId); return true; } //////////////////////////////////////////////////////////////////////////////// HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, const std::filesystem::path& BaseDir, const CloudCacheClientOptions& ComputeOptions, const CloudCacheClientOptions& StorageOptions, const UpstreamAuthConfig& ComputeAuthConfig, const UpstreamAuthConfig& StorageAuthConfig, AuthMgr& Mgr) : m_Log(logging::Get("apply")) , m_CasStore(Store) , m_CidStore(InCidStore) , m_SandboxPath(BaseDir / "scratch") , m_FunctionPath(BaseDir / "func") { m_UpstreamApply = UpstreamApply::Create({}, m_CasStore, m_CidStore); auto HordeUpstreamEndpoint = UpstreamApplyEndpoint::CreateHordeEndpoint(ComputeOptions, ComputeAuthConfig, StorageOptions, StorageAuthConfig, m_CasStore, m_CidStore, Mgr); m_UpstreamApply->RegisterEndpoint(std::move(HordeUpstreamEndpoint)); m_UpstreamApply->Initialize(); m_Router.AddPattern("job", "([[:digit:]]+)"); m_Router.AddPattern("worker", "([[:xdigit:]]{40})"); m_Router.AddPattern("action", "([[:xdigit:]]{40})"); m_Router.RegisterRoute( "workers/{worker}", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); switch (HttpReq.RequestVerb()) { case HttpVerb::kGet: { RwLock::SharedLockScope _(m_WorkerLock); if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end()) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } else { const WorkerDesc& Desc = It->second; return HttpReq.WriteResponse(HttpResponseCode::OK, Desc.Descriptor); } } break; case HttpVerb::kPost: { switch (HttpReq.RequestContentType()) { case HttpContentType::kCbObject: { CbObject FunctionSpec = HttpReq.ReadPayloadObject(); // Determine which pieces are missing and need to be transmitted to populate CAS CasChunkSet ChunkSet; FunctionSpec.IterateAttachments([&](CbFieldView Field) { const IoHash Hash = Field.AsHash(); ChunkSet.AddChunkToSet(Hash); }); // Note that we store executables uncompressed to make it // more straightforward and efficient to materialize them, hence // the CAS lookup here instead of CID for the input payloads m_CasStore.FilterChunks(ChunkSet); if (ChunkSet.IsEmpty()) { RwLock::ExclusiveLockScope _(m_WorkerLock); m_WorkerMap.insert_or_assign(WorkerId, WorkerDesc{FunctionSpec}); ZEN_DEBUG("worker {}: all attachments already available", WorkerId); return HttpReq.WriteResponse(HttpResponseCode::NoContent); } else { CbObjectWriter ResponseWriter; ResponseWriter.BeginArray("need"); ChunkSet.IterateChunks([&](const IoHash& Hash) { ZEN_DEBUG("worker {}: need chunk {}", WorkerId, Hash); ResponseWriter.AddHash(Hash); }); ResponseWriter.EndArray(); ZEN_DEBUG("worker {}: need {} attachments", WorkerId, ChunkSet.GetSize()); return HttpReq.WriteResponse(HttpResponseCode::NotFound, ResponseWriter.Save()); } } break; case HttpContentType::kCbPackage: { CbPackage FunctionSpec = HttpReq.ReadPayloadPackage(); CbObject Obj = FunctionSpec.GetObject(); std::span Attachments = FunctionSpec.GetAttachments(); int AttachmentCount = 0; int NewAttachmentCount = 0; uint64_t TotalAttachmentBytes = 0; uint64_t TotalNewBytes = 0; for (const CbAttachment& Attachment : Attachments) { ZEN_ASSERT(Attachment.IsCompressedBinary()); const IoHash DataHash = Attachment.GetHash(); CompressedBuffer DataView = Attachment.AsCompressedBinary(); SharedBuffer Decompressed = DataView.Decompress(); const uint64_t DecompressedSize = DataView.GetRawSize(); ZEN_UNUSED(DataHash); TotalAttachmentBytes += DecompressedSize; ++AttachmentCount; // Note that we store executables uncompressed to make it // more straightforward and efficient to materialize them const CasStore::InsertResult InsertResult = m_CasStore.InsertChunk(Decompressed.AsIoBuffer(), IoHash::FromBLAKE3(DataView.GetRawHash())); if (InsertResult.New) { TotalNewBytes += DecompressedSize; ++NewAttachmentCount; } } ZEN_DEBUG("worker {}: {} in {} attachments, {} in {} new attachments", WorkerId, zen::NiceBytes(TotalAttachmentBytes), AttachmentCount, zen::NiceBytes(TotalNewBytes), NewAttachmentCount); RwLock::ExclusiveLockScope _(m_WorkerLock); m_WorkerMap.insert_or_assign(WorkerId, WorkerDesc{.Descriptor = Obj}); return HttpReq.WriteResponse(HttpResponseCode::NoContent); } break; default: break; } } break; default: break; } }, HttpVerb::kGet | HttpVerb::kPost); m_Router.RegisterRoute( "jobs/{job}", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); switch (HttpReq.RequestVerb()) { case HttpVerb::kGet: break; case HttpVerb::kPost: break; default: break; } }, HttpVerb::kGet | HttpVerb::kPost); m_Router.RegisterRoute( "jobs/{worker}/{action}", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); const IoHash ActionId = IoHash::FromHexString(Req.GetCapture(2)); switch (HttpReq.RequestVerb()) { case HttpVerb::kGet: { CbPackage Output; HttpResponseCode ResponseCode = ExecActionUpstreamResult(WorkerId, ActionId, Output); if (ResponseCode != HttpResponseCode::OK) { return HttpReq.WriteResponse(ResponseCode); } return HttpReq.WriteResponse(HttpResponseCode::OK, Output); } break; } }, HttpVerb::kGet); m_Router.RegisterRoute( "jobs/{worker}", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); WorkerDesc Worker; { RwLock::SharedLockScope _(m_WorkerLock); if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end()) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } else { Worker = It->second; } } switch (HttpReq.RequestVerb()) { case HttpVerb::kGet: // TODO: return status of all pending or executing jobs break; case HttpVerb::kPost: switch (HttpReq.RequestContentType()) { case HttpContentType::kCbObject: { // This operation takes the proposed job spec and identifies which // chunks are not present on this server. This list is then returned in // the "need" list in the response IoBuffer Payload = HttpReq.ReadPayload(); CbObject RequestObject = LoadCompactBinaryObject(Payload); std::vector NeedList; RequestObject.IterateAttachments([&](CbFieldView Field) { const IoHash FileHash = Field.AsHash(); if (!m_CidStore.ContainsChunk(FileHash)) { NeedList.push_back(FileHash); } }); if (NeedList.empty()) { // We already have everything CbObject Output; HttpResponseCode ResponseCode = ExecActionUpstream(Worker, RequestObject, Output); if (ResponseCode != HttpResponseCode::OK) { return HttpReq.WriteResponse(ResponseCode); } return HttpReq.WriteResponse(HttpResponseCode::OK, Output); } CbObjectWriter Cbo; Cbo.BeginArray("need"); for (const IoHash& Hash : NeedList) { Cbo << Hash; } Cbo.EndArray(); CbObject Response = Cbo.Save(); return HttpReq.WriteResponse(HttpResponseCode::NotFound, Response); } break; case HttpContentType::kCbPackage: { CbPackage Action = HttpReq.ReadPayloadPackage(); CbObject ActionObj = Action.GetObject(); std::span Attachments = Action.GetAttachments(); int AttachmentCount = 0; int NewAttachmentCount = 0; uint64_t TotalAttachmentBytes = 0; uint64_t TotalNewBytes = 0; for (const CbAttachment& Attachment : Attachments) { ZEN_ASSERT(Attachment.IsCompressedBinary()); const IoHash DataHash = Attachment.GetHash(); CompressedBuffer DataView = Attachment.AsCompressedBinary(); ZEN_UNUSED(DataHash); const uint64_t CompressedSize = DataView.GetCompressedSize(); TotalAttachmentBytes += CompressedSize; ++AttachmentCount; const CidStore::InsertResult InsertResult = m_CidStore.AddChunk(DataView); if (InsertResult.New) { TotalNewBytes += CompressedSize; ++NewAttachmentCount; } } ZEN_DEBUG("new action: {} in {} attachments. {} new ({} attachments)", zen::NiceBytes(TotalAttachmentBytes), AttachmentCount, zen::NiceBytes(TotalNewBytes), NewAttachmentCount); CbObject Output; HttpResponseCode ResponseCode = ExecActionUpstream(Worker, ActionObj, Output); if (ResponseCode != HttpResponseCode::OK) { return HttpReq.WriteResponse(ResponseCode); } return HttpReq.WriteResponse(HttpResponseCode::OK, Output); } break; default: break; } break; default: break; } }, HttpVerb::kPost); } HttpFunctionService::~HttpFunctionService() { } const char* HttpFunctionService::BaseUri() const { return "/apply/"; } void HttpFunctionService::HandleRequest(HttpServerRequest& Request) { if (m_Router.HandleRequest(Request) == false) { ZEN_WARN("No route found for {0}", Request.RelativeUri()); } } std::filesystem::path HttpFunctionService::CreateNewSandbox() { std::string UniqueId = std::to_string(++m_SandboxCount); std::filesystem::path Path = m_SandboxPath / UniqueId; zen::CreateDirectories(Path); return Path; } CbPackage HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action) { using namespace std::literals; std::filesystem::path SandboxPath = CreateNewSandbox(); CbObject Desc = Worker.Descriptor; // Manifest worker in Sandbox for (auto& It : Desc["executables"]) { CbObjectView ExecEntry = It.AsObjectView(); std::string_view Name = ExecEntry["name"sv].AsString(); const IoHash ChunkHash = ExecEntry["hash"sv].AsHash(); const uint64_t Size = ExecEntry["size"sv].AsUInt64(); std::filesystem::path FilePath{SandboxPath / Name}; IoBuffer DataBuffer = m_CasStore.FindChunk(ChunkHash); if (!DataBuffer) { throw std::runtime_error(fmt::format("worker CAS chunk '{}' missing", ChunkHash)); } if (DataBuffer.Size() != Size) { throw std::runtime_error( fmt::format("worker CAS chunk '{}' size: {}, action spec expected {}", ChunkHash, DataBuffer.Size(), Size)); } zen::WriteFile(FilePath, DataBuffer); } for (auto& It : Desc["dirs"]) { std::string_view Name = It.AsString(); std::filesystem::path DirPath{SandboxPath / Name}; zen::CreateDirectories(DirPath); } for (auto& It : Desc["files"]) { CbObjectView FileEntry = It.AsObjectView(); std::string_view Name = FileEntry["name"sv].AsString(); const IoHash ChunkHash = FileEntry["hash"sv].AsHash(); const uint64_t Size = FileEntry["size"sv].AsUInt64(); std::filesystem::path FilePath{SandboxPath / Name}; IoBuffer DataBuffer = m_CasStore.FindChunk(ChunkHash); if (!DataBuffer) { throw std::runtime_error(fmt::format("worker CAS chunk '{}' missing", ChunkHash)); } if (DataBuffer.Size() != Size) { throw std::runtime_error( fmt::format("worker CAS chunk '{}' size: {}, action spec expected {}", ChunkHash, DataBuffer.Size(), Size)); } zen::WriteFile(FilePath, DataBuffer); } // Write out action zen::WriteFile(SandboxPath / "build.action", Action.GetBuffer().AsIoBuffer()); // Manifest inputs in sandbox Action.IterateAttachments([&](CbFieldView Field) { const IoHash Cid = Field.AsHash(); std::filesystem::path FilePath{SandboxPath / "Inputs" / Cid.ToHexString()}; IoBuffer DataBuffer = m_CidStore.FindChunkByCid(Cid); if (!DataBuffer) { throw std::runtime_error(fmt::format("input CID chunk '{}' missing", Cid)); } zen::WriteFile(FilePath, DataBuffer); }); // Set up environment variables StringBuilder<1024> EnvironmentBlock; for (auto& It : Desc["environment"]) { EnvironmentBlock.Append(It.AsString()); EnvironmentBlock.Append('\0'); } EnvironmentBlock.Append('\0'); EnvironmentBlock.Append('\0'); // Execute process std::string_view ExecPath = Desc["path"].AsString(); std::filesystem::path ExePath = SandboxPath / ExecPath; WideStringBuilder<512> CommandLine; CommandLine.Append(L'"'); CommandLine.Append(ExePath.c_str()); CommandLine.Append(L'"'); CommandLine.Append(L" -Build=build.action"); LPSECURITY_ATTRIBUTES lpProcessAttributes = nullptr; LPSECURITY_ATTRIBUTES lpThreadAttributes = nullptr; BOOL bInheritHandles = FALSE; DWORD dwCreationFlags = 0; STARTUPINFO StartupInfo{}; StartupInfo.cb = sizeof StartupInfo; PROCESS_INFORMATION ProcessInformation{}; BOOL Success = CreateProcessW(nullptr, CommandLine.Data(), lpProcessAttributes, lpThreadAttributes, bInheritHandles, dwCreationFlags, (LPVOID)EnvironmentBlock.Data(), // Environment block SandboxPath.c_str(), // Current directory &StartupInfo, /* out */ &ProcessInformation); if (!Success) { zen::ThrowLastError("Unable to launch process" /* TODO: Add context */); } CloseHandle(ProcessInformation.hThread); auto _ = MakeGuard([&] { CloseHandle(ProcessInformation.hProcess); }); DWORD Result = WaitForSingleObject(ProcessInformation.hProcess, INFINITE); if (Result != WAIT_OBJECT_0) { zen::ThrowLastError("Process wait failed" /* TODO: Add context */); } DWORD ExitCode = 0; GetExitCodeProcess(ProcessInformation.hProcess, &ExitCode); // Gather outputs FileContents OutputData = zen::ReadFile(SandboxPath / "build.output"); if (OutputData.ErrorCode) { throw std::system_error(OutputData.ErrorCode, "Failed to read build output file"); } // TODO: should have a more straightforward way to perform this ZEN_ASSERT(OutputData.Data.size() == 1); CbPackage OutputPackage; CbObject Output = zen::LoadCompactBinaryObject(OutputData.Data[0]); uint64_t TotalAttachmentBytes = 0; uint64_t TotalRawAttachmentBytes = 0; Output.IterateAttachments([&](CbFieldView Field) { IoHash Hash = Field.AsHash(); std::filesystem::path OutputPath{SandboxPath / "Outputs" / Hash.ToHexString()}; FileContents ChunkData = zen::ReadFile(OutputPath); if (ChunkData.ErrorCode) { throw std::system_error(ChunkData.ErrorCode, "Failed to read build output chunk file"); } ZEN_ASSERT(OutputData.Data.size() == 1); CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(ChunkData.Data[0])); if (!AttachmentBuffer) { throw std::runtime_error("Invalid output encountered (not valid CompressedBuffer format)"); } TotalAttachmentBytes += AttachmentBuffer.GetCompressedSize(); TotalRawAttachmentBytes += AttachmentBuffer.GetCompressedSize(); CbAttachment Attachment(AttachmentBuffer); OutputPackage.AddAttachment(Attachment); }); OutputPackage.SetObject(Output); ZEN_DEBUG("Action completed with {} attachments ({} compressed, {} uncompressed)", OutputPackage.GetAttachments().size(), NiceBytes(TotalAttachmentBytes), NiceBytes(TotalRawAttachmentBytes)); return OutputPackage; } HttpResponseCode HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject Action, CbObject& Object) { const IoHash WorkerId = Worker.Descriptor.GetHash(); const IoHash ActionId = Action.GetHash(); Action.MakeOwned(); ZEN_INFO("Action {}/{} being processed...", WorkerId.ToHexString(), ActionId.ToHexString()); auto EnqueueResult = m_UpstreamApply->EnqueueUpstream({.WorkerDescriptor = Worker.Descriptor, .Action = std::move(Action)}); if (!EnqueueResult.Success) { ZEN_ERROR("Error enqueuing upstream Action {}/{}", WorkerId.ToHexString(), ActionId.ToHexString()); return HttpResponseCode::InternalServerError; } CbObjectWriter Writer; Writer.AddHash("worker", WorkerId); Writer.AddHash("action", ActionId); Object = std::move(Writer.Save()); return HttpResponseCode::OK; } HttpResponseCode HttpFunctionService::ExecActionUpstreamResult(const IoHash& WorkerId, const IoHash& ActionId, CbPackage& Package) { auto Status = m_UpstreamApply->GetStatus(WorkerId, ActionId); if (!Status.Success) { // throw std::runtime_error(fmt::format("Action {}/{} not found", WorkerId.ToHexString(), ActionId.ToHexString()).c_str()); return HttpResponseCode::NotFound; } if (Status.Status.State != UpstreamApplyState::Complete) { return HttpResponseCode::Accepted; } GetUpstreamApplyResult& Completed = Status.Status.Result; if (!Completed.Success || Completed.Error.ErrorCode != 0) { ZEN_ERROR("Action {}/{} failed:\n stdout: {}\n stderr: {}\n reason: {}\n errorcode: {}", WorkerId.ToHexString(), ActionId.ToHexString(), Completed.StdOut, Completed.StdErr, Completed.Error.Reason, Completed.Error.ErrorCode); return HttpResponseCode::InternalServerError; } ZEN_INFO("Action {}/{} completed with {} attachments ({} compressed, {} uncompressed)", WorkerId.ToHexString(), ActionId.ToHexString(), Completed.OutputPackage.GetAttachments().size(), NiceBytes(Completed.TotalAttachmentBytes), NiceBytes(Completed.TotalRawAttachmentBytes)); Package = std::move(Completed.OutputPackage); return HttpResponseCode::OK; } } // namespace zen #endif // ZEN_WITH_COMPUTE_SERVICES