diff options
| author | Stefan Boberg <[email protected]> | 2023-10-11 10:03:54 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-10-11 10:03:54 +0200 |
| commit | 1ad940fafb5e3eae7b308dd290b6de6ade69a3eb (patch) | |
| tree | 1d1efe188f45bc422292e75c6784929765882771 /src | |
| parent | fix clang-format whoopsie (diff) | |
| download | zen-1ad940fafb5e3eae7b308dd290b6de6ade69a3eb.tar.xz zen-1ad940fafb5e3eae7b308dd290b6de6ade69a3eb.zip | |
remove legacy compute interfaces (#461)
* removed legacy compute code, which will be replaced with a new implementation in the future
* also updated references to Jupiter storage
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenserver/compute/function.cpp | 629 | ||||
| -rw-r--r-- | src/zenserver/compute/function.h | 73 | ||||
| -rw-r--r-- | src/zenserver/config.cpp | 150 | ||||
| -rw-r--r-- | src/zenserver/config.h | 66 | ||||
| -rw-r--r-- | src/zenserver/upstream/hordecompute.cpp | 1457 | ||||
| -rw-r--r-- | src/zenserver/upstream/jupiter.cpp | 129 | ||||
| -rw-r--r-- | src/zenserver/upstream/jupiter.h | 3 | ||||
| -rw-r--r-- | src/zenserver/upstream/upstreamapply.cpp | 459 | ||||
| -rw-r--r-- | src/zenserver/upstream/upstreamapply.h | 192 | ||||
| -rw-r--r-- | src/zenserver/upstream/upstreamcache.cpp | 2 | ||||
| -rw-r--r-- | src/zenserver/zenserver.cpp | 79 | ||||
| -rw-r--r-- | src/zenserver/zenserver.h | 14 |
12 files changed, 41 insertions, 3212 deletions
diff --git a/src/zenserver/compute/function.cpp b/src/zenserver/compute/function.cpp deleted file mode 100644 index 493e2666e..000000000 --- a/src/zenserver/compute/function.cpp +++ /dev/null @@ -1,629 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "function.h" - -#if ZEN_WITH_COMPUTE_SERVICES - -# include <upstream/jupiter.h> -# include <upstream/upstreamapply.h> -# include <upstream/upstreamcache.h> -# include <zencore/compactbinary.h> -# include <zencore/compactbinarybuilder.h> -# include <zencore/compactbinarypackage.h> -# include <zencore/compress.h> -# include <zencore/except.h> -# include <zencore/filesystem.h> -# include <zencore/fmtutils.h> -# include <zencore/iobuffer.h> -# include <zencore/iohash.h> -# include <zencore/scopeguard.h> -# include <zenstore/cidstore.h> - -# include <span> - -using namespace std::literals; - -namespace zen { - -HttpFunctionService::HttpFunctionService(CidStore& InCidStore, - const CloudCacheClientOptions& ComputeOptions, - const CloudCacheClientOptions& StorageOptions, - const UpstreamAuthConfig& ComputeAuthConfig, - const UpstreamAuthConfig& StorageAuthConfig, - AuthMgr& Mgr) -: m_Log(logging::Get("apply")) -, m_CidStore(InCidStore) -{ - m_UpstreamApply = UpstreamApply::Create({}, m_CidStore); - - InitializeThread = std::thread{[this, ComputeOptions, StorageOptions, ComputeAuthConfig, StorageAuthConfig, &Mgr] { - auto HordeUpstreamEndpoint = UpstreamApplyEndpoint::CreateHordeEndpoint(ComputeOptions, - ComputeAuthConfig, - StorageOptions, - StorageAuthConfig, - m_CidStore, - Mgr); - m_UpstreamApply->RegisterEndpoint(std::move(HordeUpstreamEndpoint)); - m_UpstreamApply->Initialize(); - }}; - - m_Router.AddPattern("job", "([[:digit:]]+)"); - m_Router.AddPattern("worker", "([[:xdigit:]]{40})"); - m_Router.AddPattern("action", "([[:xdigit:]]{40})"); - - m_Router.RegisterRoute( - "ready", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - return HttpReq.WriteResponse(m_UpstreamApply->IsHealthy() ? HttpResponseCode::OK : HttpResponseCode::ServiceUnavailable); - }, - HttpVerb::kGet); - - m_Router.RegisterRoute( - "workers/{worker}", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); - - switch (HttpReq.RequestVerb()) - { - case HttpVerb::kGet: - { - RwLock::SharedLockScope _(m_WorkerLock); - - if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end()) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - else - { - const WorkerDesc& Desc = It->second; - return HttpReq.WriteResponse(HttpResponseCode::OK, Desc.Descriptor); - } - } - break; - - case HttpVerb::kPost: - { - switch (HttpReq.RequestContentType()) - { - case HttpContentType::kCbObject: - { - CbObject FunctionSpec = HttpReq.ReadPayloadObject(); - - // Determine which pieces are missing and need to be transmitted to populate CAS - - HashKeySet ChunkSet; - - FunctionSpec.IterateAttachments([&](CbFieldView Field) { - const IoHash Hash = Field.AsHash(); - ChunkSet.AddHashToSet(Hash); - }); - - // Note that we store executables uncompressed to make it - // more straightforward and efficient to materialize them, hence - // the CAS lookup here instead of CID for the input payloads - - m_CidStore.FilterChunks(ChunkSet); - - if (ChunkSet.IsEmpty()) - { - RwLock::ExclusiveLockScope _(m_WorkerLock); - - m_WorkerMap.insert_or_assign(WorkerId, WorkerDesc{FunctionSpec}); - - ZEN_DEBUG("worker {}: all attachments already available", WorkerId); - - return HttpReq.WriteResponse(HttpResponseCode::NoContent); - } - else - { - CbObjectWriter ResponseWriter; - ResponseWriter.BeginArray("need"); - - ChunkSet.IterateHashes([&](const IoHash& Hash) { - ZEN_DEBUG("worker {}: need chunk {}", WorkerId, Hash); - - ResponseWriter.AddHash(Hash); - }); - - ResponseWriter.EndArray(); - - ZEN_DEBUG("worker {}: need {} attachments", WorkerId, ChunkSet.GetSize()); - - return HttpReq.WriteResponse(HttpResponseCode::NotFound, ResponseWriter.Save()); - } - } - break; - - case HttpContentType::kCbPackage: - { - CbPackage FunctionSpec = HttpReq.ReadPayloadPackage(); - - CbObject Obj = FunctionSpec.GetObject(); - - std::span<const CbAttachment> Attachments = FunctionSpec.GetAttachments(); - - int AttachmentCount = 0; - int NewAttachmentCount = 0; - uint64_t TotalAttachmentBytes = 0; - uint64_t TotalNewBytes = 0; - - for (const CbAttachment& Attachment : Attachments) - { - ZEN_ASSERT(Attachment.IsCompressedBinary()); - - const IoHash DataHash = Attachment.GetHash(); - CompressedBuffer Buffer = Attachment.AsCompressedBinary(); - - ZEN_UNUSED(DataHash); - TotalAttachmentBytes += Buffer.GetCompressedSize(); - ++AttachmentCount; - - const CidStore::InsertResult InsertResult = - m_CidStore.AddChunk(Buffer.GetCompressed().Flatten().AsIoBuffer(), DataHash); - - if (InsertResult.New) - { - TotalNewBytes += Buffer.GetCompressedSize(); - ++NewAttachmentCount; - } - } - - ZEN_DEBUG("worker {}: {} in {} attachments, {} in {} new attachments", - WorkerId, - zen::NiceBytes(TotalAttachmentBytes), - AttachmentCount, - zen::NiceBytes(TotalNewBytes), - NewAttachmentCount); - - RwLock::ExclusiveLockScope _(m_WorkerLock); - - m_WorkerMap.insert_or_assign(WorkerId, WorkerDesc{.Descriptor = Obj}); - - return HttpReq.WriteResponse(HttpResponseCode::NoContent); - } - break; - - default: - break; - } - } - break; - - default: - break; - } - }, - HttpVerb::kGet | HttpVerb::kPost); - - m_Router.RegisterRoute( - "jobs/{job}", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - switch (HttpReq.RequestVerb()) - { - case HttpVerb::kGet: - break; - - case HttpVerb::kPost: - break; - - default: - break; - } - }, - HttpVerb::kGet | HttpVerb::kPost); - - m_Router.RegisterRoute( - "jobs/{worker}/{action}", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); - const IoHash ActionId = IoHash::FromHexString(Req.GetCapture(2)); - - switch (HttpReq.RequestVerb()) - { - case HttpVerb::kGet: - { - CbPackage Output; - HttpResponseCode ResponseCode = ExecActionUpstreamResult(WorkerId, ActionId, Output); - if (ResponseCode != HttpResponseCode::OK) - { - return HttpReq.WriteResponse(ResponseCode); - } - return HttpReq.WriteResponse(HttpResponseCode::OK, Output); - } - break; - } - }, - HttpVerb::kGet); - - m_Router.RegisterRoute( - "simple/{worker}", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); - - WorkerDesc Worker; - - { - RwLock::SharedLockScope _(m_WorkerLock); - - if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end()) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - else - { - Worker = It->second; - } - } - - switch (HttpReq.RequestVerb()) - { - case HttpVerb::kGet: - { - CbObject Output; - HttpResponseCode ResponseCode = ExecActionUpstreamResult(WorkerId, Output); - if (ResponseCode != HttpResponseCode::OK) - { - return HttpReq.WriteResponse(ResponseCode); - } - - { - RwLock::SharedLockScope _(m_WorkerLock); - m_WorkerMap.erase(WorkerId); - } - - return HttpReq.WriteResponse(HttpResponseCode::OK, Output); - } - break; - - case HttpVerb::kPost: - { - CbObject Output; - HttpResponseCode ResponseCode = ExecActionUpstream(Worker, Output); - if (ResponseCode != HttpResponseCode::OK) - { - return HttpReq.WriteResponse(ResponseCode); - } - return HttpReq.WriteResponse(HttpResponseCode::OK, Output); - } - break; - - default: - break; - } - }, - HttpVerb::kGet | HttpVerb::kPost); - - m_Router.RegisterRoute( - "jobs/{worker}", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); - - WorkerDesc Worker; - - { - RwLock::SharedLockScope _(m_WorkerLock); - - if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end()) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - else - { - Worker = It->second; - } - } - - switch (HttpReq.RequestVerb()) - { - case HttpVerb::kGet: - // TODO: return status of all pending or executing jobs - break; - - case HttpVerb::kPost: - switch (HttpReq.RequestContentType()) - { - case HttpContentType::kCbObject: - { - // This operation takes the proposed job spec and identifies which - // chunks are not present on this server. This list is then returned in - // the "need" list in the response - - IoBuffer Payload = HttpReq.ReadPayload(); - CbObject RequestObject = LoadCompactBinaryObject(Payload); - - std::vector<IoHash> NeedList; - - RequestObject.IterateAttachments([&](CbFieldView Field) { - const IoHash FileHash = Field.AsHash(); - - if (!m_CidStore.ContainsChunk(FileHash)) - { - NeedList.push_back(FileHash); - } - }); - - if (NeedList.empty()) - { - // We already have everything - CbObject Output; - HttpResponseCode ResponseCode = ExecActionUpstream(Worker, RequestObject, Output); - - if (ResponseCode != HttpResponseCode::OK) - { - return HttpReq.WriteResponse(ResponseCode); - } - return HttpReq.WriteResponse(HttpResponseCode::OK, Output); - } - - CbObjectWriter Cbo; - Cbo.BeginArray("need"); - - for (const IoHash& Hash : NeedList) - { - Cbo << Hash; - } - - Cbo.EndArray(); - CbObject Response = Cbo.Save(); - - return HttpReq.WriteResponse(HttpResponseCode::NotFound, Response); - } - break; - - case HttpContentType::kCbPackage: - { - CbPackage Action = HttpReq.ReadPayloadPackage(); - CbObject ActionObj = Action.GetObject(); - - std::span<const CbAttachment> Attachments = Action.GetAttachments(); - - int AttachmentCount = 0; - int NewAttachmentCount = 0; - uint64_t TotalAttachmentBytes = 0; - uint64_t TotalNewBytes = 0; - - for (const CbAttachment& Attachment : Attachments) - { - ZEN_ASSERT(Attachment.IsCompressedBinary()); - - const IoHash DataHash = Attachment.GetHash(); - CompressedBuffer DataView = Attachment.AsCompressedBinary(); - - ZEN_UNUSED(DataHash); - - const uint64_t CompressedSize = DataView.GetCompressedSize(); - - TotalAttachmentBytes += CompressedSize; - ++AttachmentCount; - - const CidStore::InsertResult InsertResult = - m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash); - - if (InsertResult.New) - { - TotalNewBytes += CompressedSize; - ++NewAttachmentCount; - } - } - - ZEN_DEBUG("new action: {} in {} attachments. {} new ({} attachments)", - zen::NiceBytes(TotalAttachmentBytes), - AttachmentCount, - zen::NiceBytes(TotalNewBytes), - NewAttachmentCount); - - CbObject Output; - HttpResponseCode ResponseCode = ExecActionUpstream(Worker, ActionObj, Output); - - if (ResponseCode != HttpResponseCode::OK) - { - return HttpReq.WriteResponse(ResponseCode); - } - return HttpReq.WriteResponse(HttpResponseCode::OK, Output); - } - break; - - default: - break; - } - break; - - default: - break; - } - }, - HttpVerb::kPost); -} - -HttpFunctionService::~HttpFunctionService() -{ -} - -const char* -HttpFunctionService::BaseUri() const -{ - return "/apply/"; -} - -void -HttpFunctionService::HandleRequest(HttpServerRequest& Request) -{ - if (m_Router.HandleRequest(Request) == false) - { - ZEN_WARN("No route found for {0}", Request.RelativeUri()); - } -} - -HttpResponseCode -HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject& Object) -{ - const IoHash WorkerId = Worker.Descriptor.GetHash(); - - ZEN_INFO("Action {} being processed...", WorkerId.ToHexString()); - - auto EnqueueResult = m_UpstreamApply->EnqueueUpstream({.WorkerDescriptor = Worker.Descriptor, .Type = UpstreamApplyType::Simple}); - if (!EnqueueResult.Success) - { - ZEN_ERROR("Error enqueuing upstream Action {}", WorkerId.ToHexString()); - return HttpResponseCode::InternalServerError; - } - - CbObjectWriter Writer; - Writer.AddHash("worker", WorkerId); - - Object = Writer.Save(); - return HttpResponseCode::OK; -} - -HttpResponseCode -HttpFunctionService::ExecActionUpstreamResult(const IoHash& WorkerId, CbObject& Object) -{ - const static IoHash Empty = CbObject().GetHash(); - auto Status = m_UpstreamApply->GetStatus(WorkerId, Empty); - if (!Status.Success) - { - return HttpResponseCode::NotFound; - } - - if (Status.Status.State != UpstreamApplyState::Complete) - { - return HttpResponseCode::Accepted; - } - - GetUpstreamApplyResult& Completed = Status.Status.Result; - - if (!Completed.Success) - { - ZEN_ERROR("Action {} failed:\n stdout: {}\n stderr: {}\n reason: {}\n errorcode: {}", - WorkerId.ToHexString(), - Completed.StdOut, - Completed.StdErr, - Completed.Error.Reason, - Completed.Error.ErrorCode); - - if (Completed.Error.ErrorCode == 0) - { - Completed.Error.ErrorCode = -1; - } - if (Completed.StdErr.empty() && !Completed.Error.Reason.empty()) - { - Completed.StdErr = Completed.Error.Reason; - } - } - else - { - ZEN_INFO("Action {} completed with {} files ExitCode={}", - WorkerId.ToHexString(), - Completed.OutputFiles.size(), - Completed.Error.ErrorCode); - } - - CbObjectWriter ResultObject; - - ResultObject.AddString("agent"sv, Completed.Agent); - ResultObject.AddString("detail"sv, Completed.Detail); - ResultObject.AddString("stdout"sv, Completed.StdOut); - ResultObject.AddString("stderr"sv, Completed.StdErr); - ResultObject.AddInteger("exitcode"sv, Completed.Error.ErrorCode); - ResultObject.BeginArray("stats"sv); - for (const auto& Timepoint : Completed.Timepoints) - { - ResultObject.BeginObject(); - ResultObject.AddString("name"sv, Timepoint.first); - ResultObject.AddDateTimeTicks("time"sv, Timepoint.second); - ResultObject.EndObject(); - } - ResultObject.EndArray(); - - ResultObject.BeginArray("files"sv); - for (const auto& File : Completed.OutputFiles) - { - ResultObject.BeginObject(); - ResultObject.AddString("name"sv, File.first.string()); - ResultObject.AddBinary("data"sv, Completed.FileData[File.second]); - ResultObject.EndObject(); - } - ResultObject.EndArray(); - - Object = ResultObject.Save(); - return HttpResponseCode::OK; -} - -HttpResponseCode -HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject Action, CbObject& Object) -{ - const IoHash WorkerId = Worker.Descriptor.GetHash(); - const IoHash ActionId = Action.GetHash(); - - Action.MakeOwned(); - - ZEN_INFO("Action {}/{} being processed...", WorkerId.ToHexString(), ActionId.ToHexString()); - - auto EnqueueResult = m_UpstreamApply->EnqueueUpstream( - {.WorkerDescriptor = Worker.Descriptor, .Action = std::move(Action), .Type = UpstreamApplyType::Asset}); - - if (!EnqueueResult.Success) - { - ZEN_ERROR("Error enqueuing upstream Action {}/{}", WorkerId.ToHexString(), ActionId.ToHexString()); - return HttpResponseCode::InternalServerError; - } - - CbObjectWriter Writer; - Writer.AddHash("worker", WorkerId); - Writer.AddHash("action", ActionId); - - Object = Writer.Save(); - return HttpResponseCode::OK; -} - -HttpResponseCode -HttpFunctionService::ExecActionUpstreamResult(const IoHash& WorkerId, const IoHash& ActionId, CbPackage& Package) -{ - auto Status = m_UpstreamApply->GetStatus(WorkerId, ActionId); - if (!Status.Success) - { - return HttpResponseCode::NotFound; - } - - if (Status.Status.State != UpstreamApplyState::Complete) - { - return HttpResponseCode::Accepted; - } - - GetUpstreamApplyResult& Completed = Status.Status.Result; - if (!Completed.Success || Completed.Error.ErrorCode != 0) - { - ZEN_ERROR("Action {}/{} failed:\n stdout: {}\n stderr: {}\n reason: {}\n errorcode: {}", - WorkerId.ToHexString(), - ActionId.ToHexString(), - Completed.StdOut, - Completed.StdErr, - Completed.Error.Reason, - Completed.Error.ErrorCode); - - return HttpResponseCode::InternalServerError; - } - - ZEN_INFO("Action {}/{} completed with {} attachments ({} compressed, {} uncompressed)", - WorkerId.ToHexString(), - ActionId.ToHexString(), - Completed.OutputPackage.GetAttachments().size(), - NiceBytes(Completed.TotalAttachmentBytes), - NiceBytes(Completed.TotalRawAttachmentBytes)); - - Package = std::move(Completed.OutputPackage); - return HttpResponseCode::OK; -} - -} // namespace zen - -#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zenserver/compute/function.h b/src/zenserver/compute/function.h deleted file mode 100644 index 650cee757..000000000 --- a/src/zenserver/compute/function.h +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <zencore/zencore.h> - -#if !defined(ZEN_WITH_COMPUTE_SERVICES) -# define ZEN_WITH_COMPUTE_SERVICES 1 -#endif - -#if ZEN_WITH_COMPUTE_SERVICES - -# include <zencore/compactbinary.h> -# include <zencore/iohash.h> -# include <zencore/logging.h> -# include <zenhttp/httpserver.h> - -# include <filesystem> -# include <unordered_map> - -namespace zen { - -class CidStore; -class UpstreamApply; -class CloudCacheClient; -class AuthMgr; - -struct UpstreamAuthConfig; -struct CloudCacheClientOptions; - -/** - * Lambda style compute function service - */ -class HttpFunctionService : public HttpService -{ -public: - HttpFunctionService(CidStore& InCidStore, - const CloudCacheClientOptions& ComputeOptions, - const CloudCacheClientOptions& StorageOptions, - const UpstreamAuthConfig& ComputeAuthConfig, - const UpstreamAuthConfig& StorageAuthConfig, - AuthMgr& Mgr); - ~HttpFunctionService(); - - virtual const char* BaseUri() const override; - virtual void HandleRequest(HttpServerRequest& Request) override; - -private: - std::thread InitializeThread; - spdlog::logger& Log() { return m_Log; } - spdlog::logger& m_Log; - HttpRequestRouter m_Router; - CidStore& m_CidStore; - std::unique_ptr<UpstreamApply> m_UpstreamApply; - - struct WorkerDesc - { - CbObject Descriptor; - }; - - [[nodiscard]] HttpResponseCode ExecActionUpstream(const WorkerDesc& Worker, CbObject& Object); - [[nodiscard]] HttpResponseCode ExecActionUpstreamResult(const IoHash& WorkerId, CbObject& Object); - - [[nodiscard]] HttpResponseCode ExecActionUpstream(const WorkerDesc& Worker, CbObject Action, CbObject& Object); - [[nodiscard]] HttpResponseCode ExecActionUpstreamResult(const IoHash& WorkerId, const IoHash& ActionId, CbPackage& Package); - - RwLock m_WorkerLock; - std::unordered_map<IoHash, WorkerDesc> m_WorkerMap; -}; - -} // namespace zen - -#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zenserver/config.cpp b/src/zenserver/config.cpp index 342f41b68..85db7bade 100644 --- a/src/zenserver/config.cpp +++ b/src/zenserver/config.cpp @@ -855,57 +855,6 @@ ParseConfigFile(const std::filesystem::path& Path, LuaOptions.AddOption("cache.upstream.zen.dns"sv, ServerOptions.UpstreamCacheConfig.ZenConfig.Dns); LuaOptions.AddOption("cache.upstream.zen.url"sv, ServerOptions.UpstreamCacheConfig.ZenConfig.Urls); - ////// exec - LuaOptions.AddOption("exec.enable"sv, ServerOptions.ExecServiceEnabled); - - ////// compute - LuaOptions.AddOption("compute.enable"sv, ServerOptions.ComputeServiceEnabled); - - LuaOptions.AddOption("compute.upstream.horde.name"sv, ServerOptions.UpstreamCacheConfig.HordeConfig.Name); - LuaOptions.AddOption("compute.upstream.horde.url"sv, ServerOptions.UpstreamCacheConfig.HordeConfig.Url, "upstream-horde-url"sv); - LuaOptions.AddOption("compute.upstream.horde.oauthprovider"sv, - ServerOptions.UpstreamCacheConfig.HordeConfig.OAuthUrl, - "upstream-horde-oauth-url"sv); - LuaOptions.AddOption("compute.upstream.horde.oauthclientid"sv, - ServerOptions.UpstreamCacheConfig.HordeConfig.OAuthClientId, - "upstream-horde-oauth-clientid"sv); - LuaOptions.AddOption("compute.upstream.horde.oauthclientsecret"sv, - ServerOptions.UpstreamCacheConfig.HordeConfig.OAuthClientSecret, - "upstream-horde-oauth-clientsecret"sv); - LuaOptions.AddOption("compute.upstream.horde.openidprovider"sv, - ServerOptions.UpstreamCacheConfig.HordeConfig.OpenIdProvider, - "upstream-horde-openid-provider"sv); - LuaOptions.AddOption("compute.upstream.horde.token"sv, - ServerOptions.UpstreamCacheConfig.HordeConfig.AccessToken, - "upstream-horde-token"sv); - LuaOptions.AddOption("compute.upstream.horde.cluster"sv, - ServerOptions.UpstreamCacheConfig.HordeConfig.Cluster, - "upstream-horde-cluster"sv); - LuaOptions.AddOption("compute.upstream.horde.namespace"sv, - ServerOptions.UpstreamCacheConfig.HordeConfig.Namespace, - "upstream-horde-namespace"sv); - - ////// compute storage - LuaOptions.AddOption("compute.upstream.storage.name"sv, ServerOptions.UpstreamCacheConfig.HordeConfig.Name); - LuaOptions.AddOption("compute.upstream.storage.url"sv, - ServerOptions.UpstreamCacheConfig.HordeConfig.Url, - "upstream-horde-storage-oauth-url"sv); - LuaOptions.AddOption("compute.upstream.storage.oauthprovider"sv, - ServerOptions.UpstreamCacheConfig.HordeConfig.StorageOAuthUrl, - "upstream-horde-storage-oauth-url"sv); - LuaOptions.AddOption("compute.upstream.storage.oauthclientid"sv, - ServerOptions.UpstreamCacheConfig.HordeConfig.StorageOAuthClientId, - "upstream-horde-storage-oauth-clientid"sv); - LuaOptions.AddOption("compute.upstream.storage.oauthclientsecret"sv, - ServerOptions.UpstreamCacheConfig.HordeConfig.StorageOAuthClientSecret, - "upstream-horde-storage-oauth-clientsecret"sv); - LuaOptions.AddOption("compute.upstream.storage.openidprovider"sv, - ServerOptions.UpstreamCacheConfig.HordeConfig.StorageOpenIdProvider, - "upstream-horde-storage-openid-provider"sv); - LuaOptions.AddOption("compute.upstream.storage.token"sv, - ServerOptions.UpstreamCacheConfig.HordeConfig.StorageAccessToken, - "upstream-horde-storage-token"sv); - LuaOptions.AddOption("gc.enabled"sv, ServerOptions.GcConfig.Enabled, "gc-enabled"sv); LuaOptions.AddOption("gc.monitorintervalseconds"sv, ServerOptions.GcConfig.MonitorIntervalSeconds, "gc-monitor-interval-seconds"sv); @@ -1208,13 +1157,6 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) cxxopts::value<int32_t>(ServerOptions.UpstreamCacheConfig.TimeoutMilliseconds)->default_value("0"), ""); - options.add_option("compute", - "", - "upstream-horde-url", - "URL to a Horde instance.", - cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.Url)->default_value(""), - ""); - options.add_option("cache", "", "cache-write-log", @@ -1257,98 +1199,6 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) cxxopts::value<uint64_t>(ServerOptions.StructuredCacheConfig.MemMaxAgeSeconds)->default_value("86400"), ""); - options.add_option("compute", - "", - "upstream-horde-oauth-url", - "URL to the OAuth provier", - cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.OAuthUrl)->default_value(""), - ""); - - options.add_option("compute", - "", - "upstream-horde-oauth-clientid", - "The OAuth client ID", - cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.OAuthClientId)->default_value(""), - ""); - - options.add_option("compute", - "", - "upstream-horde-oauth-clientsecret", - "The OAuth client secret", - cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.OAuthClientSecret)->default_value(""), - ""); - - options.add_option("compute", - "", - "upstream-horde-openid-provider", - "Name of a registered Open ID provider", - cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.OpenIdProvider)->default_value(""), - ""); - - options.add_option("compute", - "", - "upstream-horde-token", - "A static authentication token", - cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.AccessToken)->default_value(""), - ""); - - options.add_option("compute", - "", - "upstream-horde-storage-url", - "URL to a Horde Storage instance.", - cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.StorageUrl)->default_value(""), - ""); - - options.add_option("compute", - "", - "upstream-horde-storage-oauth-url", - "URL to the OAuth provier", - cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.StorageOAuthUrl)->default_value(""), - ""); - - options.add_option("compute", - "", - "upstream-horde-storage-oauth-clientid", - "The OAuth client ID", - cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.StorageOAuthClientId)->default_value(""), - ""); - - options.add_option( - "compute", - "", - "upstream-horde-storage-oauth-clientsecret", - "The OAuth client secret", - cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.StorageOAuthClientSecret)->default_value(""), - ""); - - options.add_option("compute", - "", - "upstream-horde-storage-openid-provider", - "Name of a registered Open ID provider", - cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.StorageOpenIdProvider)->default_value(""), - ""); - - options.add_option("compute", - "", - "upstream-horde-storage-token", - "A static authentication token", - cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.StorageAccessToken)->default_value(""), - ""); - - options.add_option("compute", - "", - "upstream-horde-cluster", - "The Horde compute cluster id", - cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.Cluster)->default_value(""), - ""); - - options.add_option("compute", - "", - "upstream-horde-namespace", - "The Jupiter namespace to use with Horde compute", - cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.Namespace)->default_value(""), - ""); - options.add_option("gc", "", "gc-enabled", diff --git a/src/zenserver/config.h b/src/zenserver/config.h index ee57b23cc..d3b20218f 100644 --- a/src/zenserver/config.h +++ b/src/zenserver/config.h @@ -23,27 +23,6 @@ struct ZenUpstreamJupiterConfig std::string DdcNamespace; }; -struct ZenUpstreamHordeConfig -{ - std::string Name; - std::string Url; - std::string OAuthUrl; - std::string OAuthClientId; - std::string OAuthClientSecret; - std::string OpenIdProvider; - std::string AccessToken; - - std::string StorageUrl; - std::string StorageOAuthUrl; - std::string StorageOAuthClientId; - std::string StorageOAuthClientSecret; - std::string StorageOpenIdProvider; - std::string StorageAccessToken; - - std::string Cluster; - std::string Namespace; -}; - struct ZenUpstreamZenConfig { std::string Name; @@ -62,7 +41,6 @@ enum class UpstreamCachePolicy : uint8_t struct ZenUpstreamCacheConfig { ZenUpstreamJupiterConfig JupiterConfig; - ZenUpstreamHordeConfig HordeConfig; ZenUpstreamZenConfig ZenConfig; int32_t UpstreamThreadCount = 4; int32_t ConnectTimeoutMilliseconds = 5000; @@ -137,29 +115,27 @@ struct ZenServerOptions ZenObjectStoreConfig ObjectStoreConfig; zen::HttpServerConfig HttpServerConfig; ZenStructuredCacheConfig StructuredCacheConfig; - std::filesystem::path DataDir; // Root directory for state (used for testing) - std::filesystem::path ContentDir; // Root directory for serving frontend content (experimental) - std::filesystem::path AbsLogFile; // Absolute path to main log file - std::filesystem::path ConfigFile; // Path to Lua config file - std::string ChildId; // Id assigned by parent process (used for lifetime management) - std::string LogId; // Id for tagging log output - std::string EncryptionKey; // 256 bit AES encryption key - std::string EncryptionIV; // 128 bit AES initialization vector - int BasePort = 1337; // Service listen port (used for both UDP and TCP) - int OwnerPid = 0; // Parent process id (zero for standalone) - bool InstallService = false; // Flag used to initiate service install (temporary) - bool UninstallService = false; // Flag used to initiate service uninstall (temporary) - bool IsDebug = false; - bool IsTest = false; - bool IsDedicated = false; // Indicates a dedicated/shared instance, with larger resource requirements - bool ExecServiceEnabled = true; - bool ComputeServiceEnabled = true; - bool ShouldCrash = false; // Option for testing crash handling - bool IsFirstRun = false; - bool NoSentry = false; - bool SentryAllowPII = false; // Allow personally identifiable information in sentry crash reports - bool ObjectStoreEnabled = false; - bool NoConsoleOutput = false; // Control default use of stdout for diagnostics + std::filesystem::path DataDir; // Root directory for state (used for testing) + std::filesystem::path ContentDir; // Root directory for serving frontend content (experimental) + std::filesystem::path AbsLogFile; // Absolute path to main log file + std::filesystem::path ConfigFile; // Path to Lua config file + std::string ChildId; // Id assigned by parent process (used for lifetime management) + std::string LogId; // Id for tagging log output + std::string EncryptionKey; // 256 bit AES encryption key + std::string EncryptionIV; // 128 bit AES initialization vector + int BasePort = 1337; // Service listen port (used for both UDP and TCP) + int OwnerPid = 0; // Parent process id (zero for standalone) + bool InstallService = false; // Flag used to initiate service install (temporary) + bool UninstallService = false; // Flag used to initiate service uninstall (temporary) + bool IsDebug = false; + bool IsTest = false; + bool IsDedicated = false; // Indicates a dedicated/shared instance, with larger resource requirements + bool ShouldCrash = false; // Option for testing crash handling + bool IsFirstRun = false; + bool NoSentry = false; + bool SentryAllowPII = false; // Allow personally identifiable information in sentry crash reports + bool ObjectStoreEnabled = false; + bool NoConsoleOutput = false; // Control default use of stdout for diagnostics #if ZEN_WITH_TRACE std::string TraceHost; // Host name or IP address to send trace data to std::string TraceFile; // Path of a file to write a trace diff --git a/src/zenserver/upstream/hordecompute.cpp b/src/zenserver/upstream/hordecompute.cpp deleted file mode 100644 index cf921eaad..000000000 --- a/src/zenserver/upstream/hordecompute.cpp +++ /dev/null @@ -1,1457 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "upstreamapply.h" - -#if ZEN_WITH_COMPUTE_SERVICES - -# include "jupiter.h" - -# include <zencore/compactbinary.h> -# include <zencore/compactbinarybuilder.h> -# include <zencore/compactbinarypackage.h> -# include <zencore/compactbinaryvalidation.h> -# include <zencore/fmtutils.h> -# include <zencore/session.h> -# include <zencore/stream.h> -# include <zencore/thread.h> -# include <zencore/timer.h> -# include <zencore/workthreadpool.h> - -# include <zenstore/cidstore.h> - -# include <zenhttp/auth/authmgr.h> -# include <upstream/upstreamcache.h> - -# include "cache/structuredcachestore.h" -# include "diag/logging.h" - -# include <fmt/format.h> - -# include <algorithm> -# include <atomic> -# include <set> -# include <stack> - -namespace zen { - -using namespace std::literals; - -static const IoBuffer EmptyBuffer; -static const IoHash EmptyBufferId = IoHash::HashBuffer(EmptyBuffer); - -namespace detail { - - class HordeUpstreamApplyEndpoint final : public UpstreamApplyEndpoint - { - public: - HordeUpstreamApplyEndpoint(const CloudCacheClientOptions& ComputeOptions, - const UpstreamAuthConfig& ComputeAuthConfig, - const CloudCacheClientOptions& StorageOptions, - const UpstreamAuthConfig& StorageAuthConfig, - CidStore& CidStore, - AuthMgr& Mgr) - : m_Log(logging::Get("upstream-apply")) - , m_CidStore(CidStore) - , m_AuthMgr(Mgr) - { - m_DisplayName = fmt::format("{} - '{}'+'{}'", ComputeOptions.Name, ComputeOptions.ServiceUrl, StorageOptions.ServiceUrl); - m_ChannelId = fmt::format("zen-{}", zen::GetSessionIdString()); - - { - std::unique_ptr<CloudCacheTokenProvider> TokenProvider; - - if (ComputeAuthConfig.OAuthUrl.empty() == false) - { - TokenProvider = - CloudCacheTokenProvider::CreateFromOAuthClientCredentials({.Url = ComputeAuthConfig.OAuthUrl, - .ClientId = ComputeAuthConfig.OAuthClientId, - .ClientSecret = ComputeAuthConfig.OAuthClientSecret}); - } - else if (ComputeAuthConfig.OpenIdProvider.empty() == false) - { - TokenProvider = - CloudCacheTokenProvider::CreateFromCallback([this, ProviderName = std::string(ComputeAuthConfig.OpenIdProvider)]() { - AuthMgr::OpenIdAccessToken Token = m_AuthMgr.GetOpenIdAccessToken(ProviderName); - return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime}; - }); - } - else - { - CloudCacheAccessToken AccessToken{.Value = std::string(ComputeAuthConfig.AccessToken), - .ExpireTime = CloudCacheAccessToken::TimePoint::max()}; - TokenProvider = CloudCacheTokenProvider::CreateFromStaticToken(AccessToken); - } - - m_Client = new CloudCacheClient(ComputeOptions, std::move(TokenProvider)); - } - - { - std::unique_ptr<CloudCacheTokenProvider> TokenProvider; - - if (StorageAuthConfig.OAuthUrl.empty() == false) - { - TokenProvider = - CloudCacheTokenProvider::CreateFromOAuthClientCredentials({.Url = StorageAuthConfig.OAuthUrl, - .ClientId = StorageAuthConfig.OAuthClientId, - .ClientSecret = StorageAuthConfig.OAuthClientSecret}); - } - else if (StorageAuthConfig.OpenIdProvider.empty() == false) - { - TokenProvider = - CloudCacheTokenProvider::CreateFromCallback([this, ProviderName = std::string(StorageAuthConfig.OpenIdProvider)]() { - AuthMgr::OpenIdAccessToken Token = m_AuthMgr.GetOpenIdAccessToken(ProviderName); - return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime}; - }); - } - else - { - CloudCacheAccessToken AccessToken{.Value = std::string(StorageAuthConfig.AccessToken), - .ExpireTime = CloudCacheAccessToken::TimePoint::max()}; - TokenProvider = CloudCacheTokenProvider::CreateFromStaticToken(AccessToken); - } - - m_StorageClient = new CloudCacheClient(StorageOptions, std::move(TokenProvider)); - } - } - - virtual ~HordeUpstreamApplyEndpoint() = default; - - virtual UpstreamEndpointHealth Initialize() override { return CheckHealth(); } - - virtual bool IsHealthy() const override { return m_HealthOk.load(); } - - virtual UpstreamEndpointHealth CheckHealth() override - { - try - { - CloudCacheSession Session(m_Client); - CloudCacheResult Result = Session.Authenticate(); - - m_HealthOk = Result.ErrorCode == 0; - - return {.Reason = std::move(Result.Reason), .Ok = Result.Success}; - } - catch (std::exception& Err) - { - return {.Reason = Err.what(), .Ok = false}; - } - } - - virtual std::string_view DisplayName() const override { return m_DisplayName; } - - virtual PostUpstreamApplyResult PostApply(UpstreamApplyRecord ApplyRecord) override - { - PostUpstreamApplyResult ApplyResult{}; - ApplyResult.Timepoints.merge(ApplyRecord.Timepoints); - - try - { - UpstreamData UpstreamData; - if (!ProcessApplyKey(ApplyRecord, UpstreamData)) - { - return {.Error{.ErrorCode = -1, .Reason = "Failed to generate task data"}}; - } - - { - ApplyResult.Timepoints["zen-storage-build-ref"] = DateTime::NowTicks(); - - bool AlreadyQueued; - { - std::scoped_lock Lock(m_TaskMutex); - AlreadyQueued = m_PendingTasks.contains(UpstreamData.TaskId); - } - if (AlreadyQueued) - { - // Pending task is already queued, return success - ApplyResult.Success = true; - return ApplyResult; - } - m_PendingTasks[UpstreamData.TaskId] = std::move(ApplyRecord); - } - - CloudCacheSession ComputeSession(m_Client); - CloudCacheSession StorageSession(m_StorageClient); - - { - CloudCacheResult Result = BatchPutBlobsIfMissing(StorageSession, UpstreamData.Blobs, UpstreamData.CasIds); - ApplyResult.Bytes += Result.Bytes; - ApplyResult.ElapsedSeconds += Result.ElapsedSeconds; - ApplyResult.Timepoints["zen-storage-upload-blobs"] = DateTime::NowTicks(); - if (!Result.Success) - { - ApplyResult.Error = {.ErrorCode = Result.ErrorCode, - .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to upload blobs"}; - return ApplyResult; - } - UpstreamData.Blobs.clear(); - UpstreamData.CasIds.clear(); - } - - { - CloudCacheResult Result = BatchPutCompressedBlobsIfMissing(StorageSession, UpstreamData.Cids); - ApplyResult.Bytes += Result.Bytes; - ApplyResult.ElapsedSeconds += Result.ElapsedSeconds; - ApplyResult.Timepoints["zen-storage-upload-compressed-blobs"] = DateTime::NowTicks(); - if (!Result.Success) - { - ApplyResult.Error = { - .ErrorCode = Result.ErrorCode, - .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to upload compressed blobs"}; - return ApplyResult; - } - UpstreamData.Cids.clear(); - } - - { - CloudCacheResult Result = BatchPutObjectsIfMissing(StorageSession, UpstreamData.Objects); - ApplyResult.Bytes += Result.Bytes; - ApplyResult.ElapsedSeconds += Result.ElapsedSeconds; - ApplyResult.Timepoints["zen-storage-upload-objects"] = DateTime::NowTicks(); - if (!Result.Success) - { - ApplyResult.Error = {.ErrorCode = Result.ErrorCode, - .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to upload objects"}; - return ApplyResult; - } - } - - { - PutRefResult RefResult = StorageSession.PutRef(StorageSession.Client().DefaultBlobStoreNamespace(), - "requests"sv, - UpstreamData.TaskId, - UpstreamData.Objects[UpstreamData.TaskId].GetBuffer().AsIoBuffer(), - ZenContentType::kCbObject); - Log().debug("Put ref {} Need={} Bytes={} Duration={}s Result={}", - UpstreamData.TaskId, - RefResult.Needs.size(), - RefResult.Bytes, - RefResult.ElapsedSeconds, - RefResult.Success); - ApplyResult.Bytes += RefResult.Bytes; - ApplyResult.ElapsedSeconds += RefResult.ElapsedSeconds; - ApplyResult.Timepoints["zen-storage-put-ref"] = DateTime::NowTicks(); - - if (RefResult.Needs.size() > 0) - { - Log().error("Failed to add task ref {} due to {} missing blobs", UpstreamData.TaskId, RefResult.Needs.size()); - for (const auto& Hash : RefResult.Needs) - { - Log().debug("Task ref {} missing blob {}", UpstreamData.TaskId, Hash); - } - - ApplyResult.Error = {.ErrorCode = RefResult.ErrorCode, - .Reason = !RefResult.Reason.empty() ? std::move(RefResult.Reason) - : "Failed to add task ref due to missing blob"}; - return ApplyResult; - } - - if (!RefResult.Success) - { - ApplyResult.Error = {.ErrorCode = RefResult.ErrorCode, - .Reason = !RefResult.Reason.empty() ? std::move(RefResult.Reason) : "Failed to add task ref"}; - return ApplyResult; - } - UpstreamData.Objects.clear(); - } - - { - CbObjectWriter Writer; - Writer.AddString("c"sv, m_ChannelId); - Writer.AddObjectAttachment("r"sv, UpstreamData.RequirementsId); - Writer.BeginArray("t"sv); - Writer.AddObjectAttachment(UpstreamData.TaskId); - Writer.EndArray(); - CbObject TasksObject = Writer.Save(); - IoBuffer TasksData = TasksObject.GetBuffer().AsIoBuffer(); - - CloudCacheResult Result = ComputeSession.PostComputeTasks(TasksData); - Log().debug("Post compute task {} Bytes={} Duration={}s Result={}", - TasksObject.GetHash(), - Result.Bytes, - Result.ElapsedSeconds, - Result.Success); - ApplyResult.Bytes += Result.Bytes; - ApplyResult.ElapsedSeconds += Result.ElapsedSeconds; - ApplyResult.Timepoints["zen-horde-post-task"] = DateTime::NowTicks(); - if (!Result.Success) - { - { - std::scoped_lock Lock(m_TaskMutex); - m_PendingTasks.erase(UpstreamData.TaskId); - } - - ApplyResult.Error = {.ErrorCode = Result.ErrorCode, - .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to post compute task"}; - return ApplyResult; - } - } - - Log().info("Task posted {}", UpstreamData.TaskId); - ApplyResult.Success = true; - return ApplyResult; - } - catch (std::exception& Err) - { - m_HealthOk = false; - return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; - } - } - - [[nodiscard]] CloudCacheResult BatchPutBlobsIfMissing(CloudCacheSession& Session, - const std::map<IoHash, IoBuffer>& Blobs, - const std::set<IoHash>& CasIds) - { - if (Blobs.size() == 0 && CasIds.size() == 0) - { - return {.Success = true}; - } - - int64_t Bytes{}; - double ElapsedSeconds{}; - - // Batch check for missing blobs - std::set<IoHash> Keys; - std::transform(Blobs.begin(), Blobs.end(), std::inserter(Keys, Keys.end()), [](const auto& It) { return It.first; }); - Keys.insert(CasIds.begin(), CasIds.end()); - - CloudCacheExistsResult ExistsResult = Session.BlobExists(Session.Client().DefaultBlobStoreNamespace(), Keys); - Log().debug("Queried {} missing blobs Need={} Duration={}s Result={}", - Keys.size(), - ExistsResult.Needs.size(), - ExistsResult.ElapsedSeconds, - ExistsResult.Success); - ElapsedSeconds += ExistsResult.ElapsedSeconds; - if (!ExistsResult.Success) - { - return {.Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds, - .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1, - .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to check if blobs exist"}; - } - - for (const auto& Hash : ExistsResult.Needs) - { - IoBuffer DataBuffer; - if (Blobs.contains(Hash)) - { - DataBuffer = Blobs.at(Hash); - } - else - { - DataBuffer = m_CidStore.FindChunkByCid(Hash); - if (!DataBuffer) - { - Log().warn("Put blob FAILED, input chunk '{}' missing", Hash); - return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .ErrorCode = -1, .Reason = "Failed to put blobs"}; - } - } - - CloudCacheResult Result = Session.PutBlob(Session.Client().DefaultBlobStoreNamespace(), Hash, DataBuffer); - Log().debug("Put blob {} Bytes={} Duration={}s Result={}", Hash, Result.Bytes, Result.ElapsedSeconds, Result.Success); - Bytes += Result.Bytes; - ElapsedSeconds += Result.ElapsedSeconds; - if (!Result.Success) - { - return {.Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds, - .ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1, - .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to put blobs"}; - } - } - - return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; - } - - [[nodiscard]] CloudCacheResult BatchPutCompressedBlobsIfMissing(CloudCacheSession& Session, const std::set<IoHash>& Cids) - { - if (Cids.size() == 0) - { - return {.Success = true}; - } - - int64_t Bytes{}; - double ElapsedSeconds{}; - - // Batch check for missing compressed blobs - CloudCacheExistsResult ExistsResult = Session.CompressedBlobExists(Session.Client().DefaultBlobStoreNamespace(), Cids); - Log().debug("Queried {} missing compressed blobs Need={} Duration={}s Result={}", - Cids.size(), - ExistsResult.Needs.size(), - ExistsResult.ElapsedSeconds, - ExistsResult.Success); - ElapsedSeconds += ExistsResult.ElapsedSeconds; - if (!ExistsResult.Success) - { - return { - .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds, - .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1, - .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to check if compressed blobs exist"}; - } - - for (const auto& Hash : ExistsResult.Needs) - { - IoBuffer DataBuffer = m_CidStore.FindChunkByCid(Hash); - if (!DataBuffer) - { - Log().warn("Put compressed blob FAILED, input CID chunk '{}' missing", Hash); - return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .ErrorCode = -1, .Reason = "Failed to put compressed blobs"}; - } - - CloudCacheResult Result = Session.PutCompressedBlob(Session.Client().DefaultBlobStoreNamespace(), Hash, DataBuffer); - Log().debug("Put compressed blob {} Bytes={} Duration={}s Result={}", - Hash, - Result.Bytes, - Result.ElapsedSeconds, - Result.Success); - Bytes += Result.Bytes; - ElapsedSeconds += Result.ElapsedSeconds; - if (!Result.Success) - { - return {.Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds, - .ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1, - .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to put compressed blobs"}; - } - } - - return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; - } - - [[nodiscard]] CloudCacheResult BatchPutObjectsIfMissing(CloudCacheSession& Session, const std::map<IoHash, CbObject>& Objects) - { - if (Objects.size() == 0) - { - return {.Success = true}; - } - - int64_t Bytes{}; - double ElapsedSeconds{}; - - // Batch check for missing objects - std::set<IoHash> Keys; - std::transform(Objects.begin(), Objects.end(), std::inserter(Keys, Keys.end()), [](const auto& It) { return It.first; }); - - CloudCacheExistsResult ExistsResult = Session.ObjectExists(Session.Client().DefaultBlobStoreNamespace(), Keys); - Log().debug("Queried {} missing objects Need={} Duration={}s Result={}", - Keys.size(), - ExistsResult.Needs.size(), - ExistsResult.ElapsedSeconds, - ExistsResult.Success); - ElapsedSeconds += ExistsResult.ElapsedSeconds; - if (!ExistsResult.Success) - { - return {.Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds, - .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1, - .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to check if objects exist"}; - } - - for (const auto& Hash : ExistsResult.Needs) - { - CloudCacheResult Result = - Session.PutObject(Session.Client().DefaultBlobStoreNamespace(), Hash, Objects.at(Hash).GetBuffer().AsIoBuffer()); - Log().debug("Put object {} Bytes={} Duration={}s Result={}", Hash, Result.Bytes, Result.ElapsedSeconds, Result.Success); - Bytes += Result.Bytes; - ElapsedSeconds += Result.ElapsedSeconds; - if (!Result.Success) - { - return {.Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds, - .ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1, - .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to put objects"}; - } - } - - return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; - } - - enum class ComputeTaskState : int32_t - { - Queued = 0, - Executing = 1, - Complete = 2, - }; - - enum class ComputeTaskOutcome : int32_t - { - Success = 0, - Failed = 1, - Cancelled = 2, - NoResult = 3, - Exipred = 4, - BlobNotFound = 5, - Exception = 6, - }; - - [[nodiscard]] static std::string_view ComputeTaskStateToString(const ComputeTaskState Outcome) - { - switch (Outcome) - { - case ComputeTaskState::Queued: - return "Queued"sv; - case ComputeTaskState::Executing: - return "Executing"sv; - case ComputeTaskState::Complete: - return "Complete"sv; - }; - return "Unknown"sv; - } - - [[nodiscard]] static std::string_view ComputeTaskOutcomeToString(const ComputeTaskOutcome Outcome) - { - switch (Outcome) - { - case ComputeTaskOutcome::Success: - return "Success"sv; - case ComputeTaskOutcome::Failed: - return "Failed"sv; - case ComputeTaskOutcome::Cancelled: - return "Cancelled"sv; - case ComputeTaskOutcome::NoResult: - return "NoResult"sv; - case ComputeTaskOutcome::Exipred: - return "Exipred"sv; - case ComputeTaskOutcome::BlobNotFound: - return "BlobNotFound"sv; - case ComputeTaskOutcome::Exception: - return "Exception"sv; - }; - return "Unknown"sv; - } - - virtual GetUpstreamApplyUpdatesResult GetUpdates(WorkerThreadPool& ThreadPool) override - { - int64_t Bytes{}; - double ElapsedSeconds{}; - - { - std::scoped_lock Lock(m_TaskMutex); - if (m_PendingTasks.empty()) - { - if (m_CompletedTasks.empty()) - { - // Nothing to do. - return {.Success = true}; - } - - UpstreamApplyCompleted CompletedTasks; - std::swap(CompletedTasks, m_CompletedTasks); - return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Completed = std::move(CompletedTasks), .Success = true}; - } - } - - try - { - CloudCacheSession ComputeSession(m_Client); - - CloudCacheResult UpdatesResult = ComputeSession.GetComputeUpdates(m_ChannelId); - Log().debug("Get compute updates Bytes={} Duration={}s Result={}", - UpdatesResult.Bytes, - UpdatesResult.ElapsedSeconds, - UpdatesResult.Success); - Bytes += UpdatesResult.Bytes; - ElapsedSeconds += UpdatesResult.ElapsedSeconds; - if (!UpdatesResult.Success) - { - return {.Error{.ErrorCode = UpdatesResult.ErrorCode, .Reason = std::move(UpdatesResult.Reason)}, - .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds}; - } - - if (!UpdatesResult.Success) - { - return {.Error{.ErrorCode = -1, .Reason = "Failed get task updates"}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds}; - } - - CbObject TaskStatus = LoadCompactBinaryObject(std::move(UpdatesResult.Response)); - - for (auto& It : TaskStatus["u"sv]) - { - CbObjectView Status = It.AsObjectView(); - IoHash TaskId = Status["h"sv].AsHash(); - const ComputeTaskState State = (ComputeTaskState)Status["s"sv].AsInt32(); - const ComputeTaskOutcome Outcome = (ComputeTaskOutcome)Status["o"sv].AsInt32(); - - Log().info("Task {} State={}", TaskId, ComputeTaskStateToString(State)); - - // Only completed tasks need to be processed - if (State != ComputeTaskState::Complete) - { - continue; - } - - IoHash WorkerId{}; - IoHash ActionId{}; - UpstreamApplyType ApplyType{}; - - { - std::scoped_lock Lock(m_TaskMutex); - auto TaskIt = m_PendingTasks.find(TaskId); - if (TaskIt != m_PendingTasks.end()) - { - WorkerId = TaskIt->second.WorkerDescriptor.GetHash(); - ActionId = TaskIt->second.Action.GetHash(); - ApplyType = TaskIt->second.Type; - m_PendingTasks.erase(TaskIt); - } - } - - if (WorkerId == IoHash::Zero) - { - Log().warn("Task {} missing from pending tasks", TaskId); - continue; - } - - std::map<std::string, uint64_t> Timepoints; - ProcessQueueTimings(Status["qs"sv].AsObjectView(), Timepoints); - ProcessExecuteTimings(Status["es"sv].AsObjectView(), Timepoints); - - if (Outcome != ComputeTaskOutcome::Success) - { - const std::string_view Detail = Status["d"sv].AsString(); - { - std::scoped_lock Lock(m_TaskMutex); - m_CompletedTasks[WorkerId][ActionId] = { - .Error{.ErrorCode = -1, .Reason = fmt::format("Task {} {}", ComputeTaskOutcomeToString(Outcome), Detail)}, - .Timepoints = std::move(Timepoints)}; - } - continue; - } - - Timepoints["zen-complete-queue-added"] = DateTime::NowTicks(); - ThreadPool.ScheduleWork([this, - ApplyType, - ResultHash = Status["r"sv].AsHash(), - Timepoints = std::move(Timepoints), - TaskId = std::move(TaskId), - WorkerId = std::move(WorkerId), - ActionId = std::move(ActionId)]() mutable { - Timepoints["zen-complete-queue-dispatched"] = DateTime::NowTicks(); - GetUpstreamApplyResult Result = ProcessTaskStatus(ApplyType, ResultHash); - Timepoints["zen-complete-queue-complete"] = DateTime::NowTicks(); - Result.Timepoints.merge(Timepoints); - - Log().debug("Task Processed {} Files={} Attachments={} ExitCode={}", - TaskId, - Result.OutputFiles.size(), - Result.OutputPackage.GetAttachments().size(), - Result.Error.ErrorCode); - { - std::scoped_lock Lock(m_TaskMutex); - m_CompletedTasks[WorkerId][ActionId] = std::move(Result); - } - }); - } - - { - std::scoped_lock Lock(m_TaskMutex); - if (m_CompletedTasks.empty()) - { - // Nothing to do. - return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; - } - UpstreamApplyCompleted CompletedTasks; - std::swap(CompletedTasks, m_CompletedTasks); - return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Completed = std::move(CompletedTasks), .Success = true}; - } - } - catch (std::exception& Err) - { - m_HealthOk = false; - return { - .Error{.ErrorCode = -1, .Reason = Err.what()}, - .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds, - }; - } - } - - virtual UpstreamApplyEndpointStats& Stats() override { return m_Stats; } - - private: - spdlog::logger& Log() { return m_Log; } - - spdlog::logger& m_Log; - CidStore& m_CidStore; - AuthMgr& m_AuthMgr; - std::string m_DisplayName; - RefPtr<CloudCacheClient> m_Client; - RefPtr<CloudCacheClient> m_StorageClient; - UpstreamApplyEndpointStats m_Stats; - std::atomic_bool m_HealthOk{false}; - std::string m_ChannelId; - - std::mutex m_TaskMutex; - std::unordered_map<IoHash, UpstreamApplyRecord> m_PendingTasks; - UpstreamApplyCompleted m_CompletedTasks; - - struct UpstreamData - { - std::map<IoHash, IoBuffer> Blobs; - std::map<IoHash, CbObject> Objects; - std::set<IoHash> CasIds; - std::set<IoHash> Cids; - IoHash TaskId; - IoHash RequirementsId; - }; - - struct UpstreamDirectory - { - std::filesystem::path Path; - std::map<std::string, UpstreamDirectory> Directories; - std::set<std::string> Files; - }; - - static void ProcessQueueTimings(CbObjectView QueueStats, std::map<std::string, uint64_t>& Timepoints) - { - uint64_t Ticks = QueueStats["t"sv].AsDateTimeTicks(); - if (Ticks == 0) - { - return; - } - - // Scope is an array of miliseconds after start time - // TODO: cleanup - Timepoints["horde-queue-added"] = Ticks; - int Index = 0; - for (auto& Item : QueueStats["s"sv].AsArrayView()) - { - Ticks += Item.AsInt32() * TimeSpan::TicksPerMillisecond; - switch (Index) - { - case 0: - Timepoints["horde-queue-dispatched"] = Ticks; - break; - case 1: - Timepoints["horde-queue-complete"] = Ticks; - break; - } - Index++; - } - } - - static void ProcessExecuteTimings(CbObjectView ExecutionStats, std::map<std::string, uint64_t>& Timepoints) - { - uint64_t Ticks = ExecutionStats["t"sv].AsDateTimeTicks(); - if (Ticks == 0) - { - return; - } - - // Scope is an array of miliseconds after start time - // TODO: cleanup - Timepoints["horde-execution-start"] = Ticks; - int Index = 0; - for (auto& Item : ExecutionStats["s"sv].AsArrayView()) - { - Ticks += Item.AsInt32() * TimeSpan::TicksPerMillisecond; - switch (Index) - { - case 0: - Timepoints["horde-execution-download-ref"] = Ticks; - break; - case 1: - Timepoints["horde-execution-download-input"] = Ticks; - break; - case 2: - Timepoints["horde-execution-execute"] = Ticks; - break; - case 3: - Timepoints["horde-execution-upload-log"] = Ticks; - break; - case 4: - Timepoints["horde-execution-upload-output"] = Ticks; - break; - case 5: - Timepoints["horde-execution-upload-ref"] = Ticks; - break; - } - Index++; - } - } - - [[nodiscard]] GetUpstreamApplyResult ProcessTaskStatus(const UpstreamApplyType ApplyType, const IoHash& ResultHash) - { - try - { - CloudCacheSession Session(m_StorageClient); - - GetUpstreamApplyResult ApplyResult{}; - - IoHash StdOutHash; - IoHash StdErrHash; - IoHash OutputHash; - - std::map<IoHash, IoBuffer> BinaryData; - - { - CloudCacheResult ObjectRefResult = - Session.GetRef(Session.Client().DefaultBlobStoreNamespace(), "responses"sv, ResultHash, ZenContentType::kCbObject); - Log().debug("Get ref {} Bytes={} Duration={}s Result={}", - ResultHash, - ObjectRefResult.Bytes, - ObjectRefResult.ElapsedSeconds, - ObjectRefResult.Success); - ApplyResult.Bytes += ObjectRefResult.Bytes; - ApplyResult.ElapsedSeconds += ObjectRefResult.ElapsedSeconds; - ApplyResult.Timepoints["zen-storage-get-ref"] = DateTime::NowTicks(); - - if (!ObjectRefResult.Success) - { - ApplyResult.Error.Reason = "Failed to get result object data"; - return ApplyResult; - } - - CbObject ResultObject = LoadCompactBinaryObject(ObjectRefResult.Response); - ApplyResult.Error.ErrorCode = ResultObject["e"sv].AsInt32(); - StdOutHash = ResultObject["so"sv].AsBinaryAttachment(); - StdErrHash = ResultObject["se"sv].AsBinaryAttachment(); - OutputHash = ResultObject["o"sv].AsObjectAttachment(); - } - - { - std::set<IoHash> NeededData; - if (OutputHash != IoHash::Zero) - { - GetObjectReferencesResult ObjectReferenceResult = - Session.GetObjectReferences(Session.Client().DefaultBlobStoreNamespace(), OutputHash); - Log().debug("Get object references {} References={} Bytes={} Duration={}s Result={}", - ResultHash, - ObjectReferenceResult.References.size(), - ObjectReferenceResult.Bytes, - ObjectReferenceResult.ElapsedSeconds, - ObjectReferenceResult.Success); - ApplyResult.Bytes += ObjectReferenceResult.Bytes; - ApplyResult.ElapsedSeconds += ObjectReferenceResult.ElapsedSeconds; - ApplyResult.Timepoints["zen-storage-get-object-references"] = DateTime::NowTicks(); - - if (!ObjectReferenceResult.Success) - { - ApplyResult.Error.Reason = "Failed to get result object references"; - return ApplyResult; - } - - NeededData = std::move(ObjectReferenceResult.References); - } - - NeededData.insert(OutputHash); - NeededData.insert(StdOutHash); - NeededData.insert(StdErrHash); - - for (const auto& Hash : NeededData) - { - if (Hash == IoHash::Zero) - { - continue; - } - CloudCacheResult BlobResult = Session.GetBlob(Session.Client().DefaultBlobStoreNamespace(), Hash); - Log().debug("Get blob {} Bytes={} Duration={}s Result={}", - Hash, - BlobResult.Bytes, - BlobResult.ElapsedSeconds, - BlobResult.Success); - ApplyResult.Bytes += BlobResult.Bytes; - ApplyResult.ElapsedSeconds += BlobResult.ElapsedSeconds; - if (!BlobResult.Success) - { - ApplyResult.Error.Reason = "Failed to get blob"; - return ApplyResult; - } - BinaryData[Hash] = std::move(BlobResult.Response); - } - ApplyResult.Timepoints["zen-storage-get-blobs"] = DateTime::NowTicks(); - } - - ApplyResult.StdOut = StdOutHash != IoHash::Zero - ? std::string((const char*)BinaryData[StdOutHash].GetData(), BinaryData[StdOutHash].GetSize()) - : ""; - ApplyResult.StdErr = StdErrHash != IoHash::Zero - ? std::string((const char*)BinaryData[StdErrHash].GetData(), BinaryData[StdErrHash].GetSize()) - : ""; - - if (OutputHash == IoHash::Zero) - { - ApplyResult.Error.Reason = "Task completed with no output object"; - return ApplyResult; - } - - CbObject OutputObject = LoadCompactBinaryObject(BinaryData[OutputHash]); - - switch (ApplyType) - { - case UpstreamApplyType::Simple: - { - ResolveMerkleTreeDirectory(""sv, OutputHash, BinaryData, ApplyResult.OutputFiles); - for (const auto& Pair : BinaryData) - { - ApplyResult.FileData[Pair.first] = std::move(BinaryData.at(Pair.first)); - } - - ApplyResult.Success = ApplyResult.Error.ErrorCode == 0; - return ApplyResult; - } - break; - case UpstreamApplyType::Asset: - { - if (ApplyResult.Error.ErrorCode != 0) - { - ApplyResult.Error.Reason = "Task completed with errors"; - return ApplyResult; - } - - // Get build.output - IoHash BuildOutputId; - IoBuffer BuildOutput; - for (auto& It : OutputObject["f"sv]) - { - const CbObjectView FileObject = It.AsObjectView(); - if (FileObject["n"sv].AsString() == "Build.output"sv) - { - BuildOutputId = FileObject["h"sv].AsBinaryAttachment(); - BuildOutput = BinaryData[BuildOutputId]; - break; - } - } - - if (BuildOutput.GetSize() == 0) - { - ApplyResult.Error.Reason = "Build.output file not found in task results"; - return ApplyResult; - } - - // Get Output directory node - IoBuffer OutputDirectoryTree; - for (auto& It : OutputObject["d"sv]) - { - const CbObjectView DirectoryObject = It.AsObjectView(); - if (DirectoryObject["n"sv].AsString() == "Outputs"sv) - { - OutputDirectoryTree = BinaryData[DirectoryObject["h"sv].AsObjectAttachment()]; - break; - } - } - - if (OutputDirectoryTree.GetSize() == 0) - { - ApplyResult.Error.Reason = "Outputs directory not found in task results"; - return ApplyResult; - } - - // load build.output as CbObject - - // Move Outputs from Horde to CbPackage - - std::unordered_map<IoHash, IoHash> CidToCompressedId; - CbPackage OutputPackage; - CbObject OutputDirectoryTreeObject = LoadCompactBinaryObject(OutputDirectoryTree); - - for (auto& It : OutputDirectoryTreeObject["f"sv]) - { - CbObjectView FileObject = It.AsObjectView(); - // Name is the uncompressed hash - IoHash DecompressedId = IoHash::FromHexString(FileObject["n"sv].AsString()); - // Hash is the compressed data hash, and how it is stored in Horde - IoHash CompressedId = FileObject["h"sv].AsBinaryAttachment(); - - if (!BinaryData.contains(CompressedId)) - { - Log().warn("Object attachment chunk not retrieved from Horde {}", CompressedId); - ApplyResult.Error.Reason = "Object attachment chunk not retrieved from Horde"; - return ApplyResult; - } - CidToCompressedId[DecompressedId] = CompressedId; - } - - // Iterate attachments, verify all chunks exist, and add to CbPackage - bool AnyErrors = false; - CbObject BuildOutputObject = LoadCompactBinaryObject(BuildOutput); - BuildOutputObject.IterateAttachments([&](CbFieldView Field) { - const IoHash DecompressedId = Field.AsHash(); - if (!CidToCompressedId.contains(DecompressedId)) - { - Log().warn("Attachment not found {}", DecompressedId); - AnyErrors = true; - return; - } - const IoHash& CompressedId = CidToCompressedId.at(DecompressedId); - - if (!BinaryData.contains(CompressedId)) - { - Log().warn("Missing output {} compressed {} uncompressed", CompressedId, DecompressedId); - AnyErrors = true; - return; - } - - IoHash RawHash; - uint64_t RawSize; - CompressedBuffer AttachmentBuffer = - CompressedBuffer::FromCompressed(SharedBuffer(BinaryData[CompressedId]), RawHash, RawSize); - - if (!AttachmentBuffer || RawHash != DecompressedId) - { - Log().warn( - "Invalid output encountered (not valid CompressedBuffer format) {} compressed {} uncompressed", - CompressedId, - DecompressedId); - AnyErrors = true; - return; - } - - ApplyResult.TotalAttachmentBytes += AttachmentBuffer.GetCompressedSize(); - ApplyResult.TotalRawAttachmentBytes += RawSize; - - CbAttachment Attachment(AttachmentBuffer, DecompressedId); - OutputPackage.AddAttachment(Attachment); - }); - - if (AnyErrors) - { - ApplyResult.Error.Reason = "Failed to get result object attachment data"; - return ApplyResult; - } - - OutputPackage.SetObject(BuildOutputObject); - ApplyResult.OutputPackage = std::move(OutputPackage); - - ApplyResult.Success = ApplyResult.Error.ErrorCode == 0; - return ApplyResult; - } - break; - } - - ApplyResult.Error.Reason = "Unknown apply type"; - return ApplyResult; - } - catch (std::exception& Err) - { - return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; - } - } - - [[nodiscard]] bool ProcessApplyKey(const UpstreamApplyRecord& ApplyRecord, UpstreamData& Data) - { - std::string ExecutablePath; - std::string WorkingDirectory; - std::vector<std::string> Arguments; - std::map<std::string, std::string> Environment; - std::set<std::filesystem::path> InputFiles; - std::set<std::string> Outputs; - std::map<std::filesystem::path, IoHash> InputFileHashes; - - ExecutablePath = ApplyRecord.WorkerDescriptor["path"sv].AsString(); - if (ExecutablePath.empty()) - { - Log().warn("process apply upstream FAILED, '{}', path missing from worker descriptor", - ApplyRecord.WorkerDescriptor.GetHash()); - return false; - } - - WorkingDirectory = ApplyRecord.WorkerDescriptor["workdir"sv].AsString(); - - for (auto& It : ApplyRecord.WorkerDescriptor["executables"sv]) - { - CbObjectView FileEntry = It.AsObjectView(); - if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.CasIds)) - { - return false; - } - } - - for (auto& It : ApplyRecord.WorkerDescriptor["files"sv]) - { - CbObjectView FileEntry = It.AsObjectView(); - if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.CasIds)) - { - return false; - } - } - - for (auto& It : ApplyRecord.WorkerDescriptor["dirs"sv]) - { - std::string_view Directory = It.AsString(); - std::string DummyFile = fmt::format("{}/.zen_empty_file", Directory); - InputFiles.insert(DummyFile); - Data.Blobs[EmptyBufferId] = EmptyBuffer; - InputFileHashes[DummyFile] = EmptyBufferId; - } - - if (!WorkingDirectory.empty()) - { - std::string DummyFile = fmt::format("{}/.zen_empty_file", WorkingDirectory); - InputFiles.insert(DummyFile); - Data.Blobs[EmptyBufferId] = EmptyBuffer; - InputFileHashes[DummyFile] = EmptyBufferId; - } - - for (auto& It : ApplyRecord.WorkerDescriptor["environment"sv]) - { - std::string_view Env = It.AsString(); - auto Index = Env.find('='); - if (Index == std::string_view::npos) - { - Log().warn("process apply upstream FAILED, environment '{}' malformed", Env); - return false; - } - - Environment[std::string(Env.substr(0, Index))] = Env.substr(Index + 1); - } - - switch (ApplyRecord.Type) - { - case UpstreamApplyType::Simple: - { - for (auto& It : ApplyRecord.WorkerDescriptor["arguments"sv]) - { - Arguments.push_back(std::string(It.AsString())); - } - - for (auto& It : ApplyRecord.WorkerDescriptor["outputs"sv]) - { - Outputs.insert(std::string(It.AsString())); - } - } - break; - case UpstreamApplyType::Asset: - { - static const std::filesystem::path BuildActionPath = "Build.action"sv; - static const std::filesystem::path InputPath = "Inputs"sv; - const IoHash ActionId = ApplyRecord.Action.GetHash(); - - Arguments.push_back("-Build=build.action"); - Outputs.insert("Build.output"); - Outputs.insert("Outputs"); - - InputFiles.insert(BuildActionPath); - InputFileHashes[BuildActionPath] = ActionId; - Data.Blobs[ActionId] = IoBufferBuilder::MakeCloneFromMemory(ApplyRecord.Action.GetBuffer().GetData(), - ApplyRecord.Action.GetBuffer().GetSize()); - - bool AnyErrors = false; - ApplyRecord.Action.IterateAttachments([&](CbFieldView Field) { - const IoHash Cid = Field.AsHash(); - const std::filesystem::path FilePath = {InputPath / Cid.ToHexString()}; - - if (!m_CidStore.ContainsChunk(Cid)) - { - Log().warn("process apply upstream FAILED, input CID chunk '{}' missing", Cid); - AnyErrors = true; - return; - } - - if (InputFiles.contains(FilePath)) - { - return; - } - - InputFiles.insert(FilePath); - InputFileHashes[FilePath] = Cid; - Data.Cids.insert(Cid); - }); - - if (AnyErrors) - { - return false; - } - } - break; - } - - const UpstreamDirectory RootDirectory = BuildDirectoryTree(InputFiles); - - CbObject Sandbox = BuildMerkleTreeDirectory(RootDirectory, InputFileHashes, Data.Cids, Data.Objects); - const IoHash SandboxHash = Sandbox.GetHash(); - Data.Objects[SandboxHash] = std::move(Sandbox); - - { - std::string_view HostPlatform = ApplyRecord.WorkerDescriptor["host"sv].AsString(); - if (HostPlatform.empty()) - { - Log().warn("process apply upstream FAILED, 'host' platform not provided"); - return false; - } - - int32_t LogicalCores = ApplyRecord.WorkerDescriptor["cores"sv].AsInt32(); - int64_t Memory = ApplyRecord.WorkerDescriptor["memory"sv].AsInt64(); - bool Exclusive = ApplyRecord.WorkerDescriptor["exclusive"sv].AsBool(); - - std::string Condition = fmt::format("Platform == '{}'", HostPlatform); - if (HostPlatform == "Win64") - { - // TODO - // Condition += " && Pool == 'Win-RemoteExec'"; - } - - std::map<std::string_view, int64_t> Resources; - if (LogicalCores > 0) - { - Resources["LogicalCores"sv] = LogicalCores; - } - if (Memory > 0) - { - Resources["RAM"sv] = std::max(Memory / 1024LL / 1024LL / 1024LL, 1LL); - } - - CbObject Requirements = BuildRequirements(Condition, Resources, Exclusive); - const IoHash RequirementsId = Requirements.GetHash(); - Data.Objects[RequirementsId] = std::move(Requirements); - Data.RequirementsId = RequirementsId; - } - - CbObject Task = BuildTask(ExecutablePath, Arguments, Environment, WorkingDirectory, SandboxHash, Data.RequirementsId, Outputs); - - const IoHash TaskId = Task.GetHash(); - Data.Objects[TaskId] = std::move(Task); - Data.TaskId = TaskId; - - return true; - } - - [[nodiscard]] bool ProcessFileEntry(const CbObjectView& FileEntry, - std::set<std::filesystem::path>& InputFiles, - std::map<std::filesystem::path, IoHash>& InputFileHashes, - std::set<IoHash>& CasIds) - { - const std::filesystem::path FilePath = FileEntry["name"sv].AsString(); - const IoHash ChunkId = FileEntry["hash"sv].AsHash(); - const uint64_t Size = FileEntry["size"sv].AsUInt64(); - - if (!m_CidStore.ContainsChunk(ChunkId)) - { - Log().warn("process apply upstream FAILED, worker CAS chunk '{}' missing", ChunkId); - return false; - } - - if (InputFiles.contains(FilePath)) - { - Log().warn("process apply upstream FAILED, worker CAS chunk '{}' size: {} duplicate filename {}", ChunkId, Size, FilePath); - return false; - } - - InputFiles.insert(FilePath); - InputFileHashes[FilePath] = ChunkId; - CasIds.insert(ChunkId); - return true; - } - - [[nodiscard]] UpstreamDirectory BuildDirectoryTree(const std::set<std::filesystem::path>& InputFiles) - { - static const std::filesystem::path RootPath; - std::map<std::filesystem::path, UpstreamDirectory*> AllDirectories; - UpstreamDirectory RootDirectory = {.Path = RootPath}; - - AllDirectories[RootPath] = &RootDirectory; - - // Build tree from flat list - for (const auto& Path : InputFiles) - { - if (Path.has_parent_path()) - { - if (!AllDirectories.contains(Path.parent_path())) - { - std::stack<std::string> PathSplit; - { - std::filesystem::path ParentPath = Path.parent_path(); - PathSplit.push(ParentPath.filename().string()); - while (ParentPath.has_parent_path()) - { - ParentPath = ParentPath.parent_path(); - PathSplit.push(ParentPath.filename().string()); - } - } - UpstreamDirectory* ParentPtr = &RootDirectory; - while (!PathSplit.empty()) - { - if (!ParentPtr->Directories.contains(PathSplit.top())) - { - std::filesystem::path NewParentPath = {ParentPtr->Path / PathSplit.top()}; - ParentPtr->Directories[PathSplit.top()] = {.Path = NewParentPath}; - AllDirectories[NewParentPath] = &ParentPtr->Directories[PathSplit.top()]; - } - ParentPtr = &ParentPtr->Directories[PathSplit.top()]; - PathSplit.pop(); - } - } - - AllDirectories[Path.parent_path()]->Files.insert(Path.filename().string()); - } - else - { - RootDirectory.Files.insert(Path.filename().string()); - } - } - - return RootDirectory; - } - - [[nodiscard]] CbObject BuildMerkleTreeDirectory(const UpstreamDirectory& RootDirectory, - const std::map<std::filesystem::path, IoHash>& InputFileHashes, - const std::set<IoHash>& Cids, - std::map<IoHash, CbObject>& Objects) - { - CbObjectWriter DirectoryTreeWriter; - - if (!RootDirectory.Files.empty()) - { - DirectoryTreeWriter.BeginArray("f"sv); - for (const auto& File : RootDirectory.Files) - { - const std::filesystem::path FilePath = {RootDirectory.Path / File}; - const IoHash& FileHash = InputFileHashes.at(FilePath); - const bool Compressed = Cids.contains(FileHash); - DirectoryTreeWriter.BeginObject(); - DirectoryTreeWriter.AddString("n"sv, File); - DirectoryTreeWriter.AddBinaryAttachment("h"sv, FileHash); - DirectoryTreeWriter.AddBool("c"sv, Compressed); - DirectoryTreeWriter.EndObject(); - } - DirectoryTreeWriter.EndArray(); - } - - if (!RootDirectory.Directories.empty()) - { - DirectoryTreeWriter.BeginArray("d"sv); - for (const auto& Item : RootDirectory.Directories) - { - CbObject Directory = BuildMerkleTreeDirectory(Item.second, InputFileHashes, Cids, Objects); - const IoHash DirectoryHash = Directory.GetHash(); - Objects[DirectoryHash] = std::move(Directory); - - DirectoryTreeWriter.BeginObject(); - DirectoryTreeWriter.AddString("n"sv, Item.first); - DirectoryTreeWriter.AddObjectAttachment("h"sv, DirectoryHash); - DirectoryTreeWriter.EndObject(); - } - DirectoryTreeWriter.EndArray(); - } - - return DirectoryTreeWriter.Save(); - } - - void ResolveMerkleTreeDirectory(const std::filesystem::path& ParentDirectory, - const IoHash& DirectoryHash, - const std::map<IoHash, IoBuffer>& Objects, - std::map<std::filesystem::path, IoHash>& OutputFiles) - { - CbObject Directory = LoadCompactBinaryObject(Objects.at(DirectoryHash)); - - for (auto& It : Directory["f"sv]) - { - const CbObjectView FileObject = It.AsObjectView(); - const std::filesystem::path Path = ParentDirectory / FileObject["n"sv].AsString(); - - OutputFiles[Path] = FileObject["h"sv].AsBinaryAttachment(); - } - - for (auto& It : Directory["d"sv]) - { - const CbObjectView DirectoryObject = It.AsObjectView(); - - ResolveMerkleTreeDirectory(ParentDirectory / DirectoryObject["n"sv].AsString(), - DirectoryObject["h"sv].AsObjectAttachment(), - Objects, - OutputFiles); - } - } - - [[nodiscard]] CbObject BuildRequirements(const std::string_view Condition, - const std::map<std::string_view, int64_t>& Resources, - const bool Exclusive) - { - CbObjectWriter Writer; - Writer.AddString("c", Condition); - if (!Resources.empty()) - { - Writer.BeginArray("r"); - for (const auto& Resource : Resources) - { - Writer.BeginArray(); - Writer.AddString(Resource.first); - Writer.AddInteger(Resource.second); - Writer.EndArray(); - } - Writer.EndArray(); - } - Writer.AddBool("e", Exclusive); - return Writer.Save(); - } - - [[nodiscard]] CbObject BuildTask(const std::string_view Executable, - const std::vector<std::string>& Arguments, - const std::map<std::string, std::string>& Environment, - const std::string_view WorkingDirectory, - const IoHash& SandboxHash, - const IoHash& RequirementsId, - const std::set<std::string>& Outputs) - { - CbObjectWriter TaskWriter; - TaskWriter.AddString("e"sv, Executable); - - if (!Arguments.empty()) - { - TaskWriter.BeginArray("a"sv); - for (const auto& Argument : Arguments) - { - TaskWriter.AddString(Argument); - } - TaskWriter.EndArray(); - } - - if (!Environment.empty()) - { - TaskWriter.BeginArray("v"sv); - for (const auto& Env : Environment) - { - TaskWriter.BeginArray(); - TaskWriter.AddString(Env.first); - TaskWriter.AddString(Env.second); - TaskWriter.EndArray(); - } - TaskWriter.EndArray(); - } - - if (!WorkingDirectory.empty()) - { - TaskWriter.AddString("w"sv, WorkingDirectory); - } - - TaskWriter.AddObjectAttachment("s"sv, SandboxHash); - TaskWriter.AddObjectAttachment("r"sv, RequirementsId); - - // Outputs - if (!Outputs.empty()) - { - TaskWriter.BeginArray("o"sv); - for (const auto& Output : Outputs) - { - TaskWriter.AddString(Output); - } - TaskWriter.EndArray(); - } - - return TaskWriter.Save(); - } - }; -} // namespace detail - -////////////////////////////////////////////////////////////////////////// - -std::unique_ptr<UpstreamApplyEndpoint> -UpstreamApplyEndpoint::CreateHordeEndpoint(const CloudCacheClientOptions& ComputeOptions, - const UpstreamAuthConfig& ComputeAuthConfig, - const CloudCacheClientOptions& StorageOptions, - const UpstreamAuthConfig& StorageAuthConfig, - CidStore& CidStore, - AuthMgr& Mgr) -{ - return std::make_unique<detail::HordeUpstreamApplyEndpoint>(ComputeOptions, - ComputeAuthConfig, - StorageOptions, - StorageAuthConfig, - CidStore, - Mgr); -} - -} // namespace zen - -#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zenserver/upstream/jupiter.cpp b/src/zenserver/upstream/jupiter.cpp index 61d8a85cc..a67c497ad 100644 --- a/src/zenserver/upstream/jupiter.cpp +++ b/src/zenserver/upstream/jupiter.cpp @@ -281,7 +281,7 @@ CloudCacheSession::GetBlob(std::string_view Namespace, const IoHash& Key) CloudCacheResult CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath) { - ZEN_TRACE_CPU("HordeClient::GetCompressedBlob"); + ZEN_TRACE_CPU("JupiterClient::GetCompressedBlob"); ExtendableStringBuilder<256> Uri; std::string KeyString = Key.ToHexString(); @@ -338,7 +338,7 @@ CloudCacheSession::GetInlineBlob(std::string_view Namespace, IoHash& OutPayloadHash, std::filesystem::path TempFolderPath) { - ZEN_TRACE_CPU("HordeClient::GetInlineBlob"); + ZEN_TRACE_CPU("JupiterClient::GetInlineBlob"); ExtendableStringBuilder<256> Uri; std::string KeyString = Key.ToHexString(); @@ -401,7 +401,7 @@ CloudCacheSession::GetInlineBlob(std::string_view Namespace, CloudCacheResult CloudCacheSession::GetObject(std::string_view Namespace, const IoHash& Key) { - ZEN_TRACE_CPU("HordeClient::GetObject"); + ZEN_TRACE_CPU("JupiterClient::GetObject"); ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString(); @@ -453,7 +453,7 @@ CloudCacheSession::GetObject(std::string_view Namespace, const IoHash& Key) PutRefResult CloudCacheSession::PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType) { - ZEN_TRACE_CPU("HordeClient::PutRef"); + ZEN_TRACE_CPU("JupiterClient::PutRef"); IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size()); @@ -524,7 +524,7 @@ CloudCacheSession::PutRef(std::string_view Namespace, std::string_view BucketId, FinalizeRefResult CloudCacheSession::FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHash) { - ZEN_TRACE_CPU("HordeClient::FinalizeRef"); + ZEN_TRACE_CPU("JupiterClient::FinalizeRef"); ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString() << "/finalize/" @@ -593,7 +593,7 @@ CloudCacheSession::FinalizeRef(std::string_view Namespace, std::string_view Buck CloudCacheResult CloudCacheSession::PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob) { - ZEN_TRACE_CPU("HordeClient::PutBlob"); + ZEN_TRACE_CPU("JupiterClient::PutBlob"); ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << Namespace << "/" << Key.ToHexString(); @@ -642,7 +642,7 @@ CloudCacheSession::PutBlob(std::string_view Namespace, const IoHash& Key, IoBuff CloudCacheResult CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob) { - ZEN_TRACE_CPU("HordeClient::PutCompressedBlob"); + ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob"); ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << Key.ToHexString(); @@ -708,7 +708,7 @@ CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& K CloudCacheResult CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Payload) { - ZEN_TRACE_CPU("HordeClient::PutCompressedBlob"); + ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob"); ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << Key.ToHexString(); @@ -766,7 +766,7 @@ CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& K CloudCacheResult CloudCacheSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object) { - ZEN_TRACE_CPU("HordeClient::PutObject"); + ZEN_TRACE_CPU("JupiterClient::PutObject"); ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString(); @@ -815,7 +815,7 @@ CloudCacheSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBu CloudCacheResult CloudCacheSession::RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key) { - ZEN_TRACE_CPU("HordeClient::RefExists"); + ZEN_TRACE_CPU("JupiterClient::RefExists"); ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString(); @@ -860,7 +860,7 @@ CloudCacheSession::RefExists(std::string_view Namespace, std::string_view Bucket GetObjectReferencesResult CloudCacheSession::GetObjectReferences(std::string_view Namespace, const IoHash& Key) { - ZEN_TRACE_CPU("HordeClient::GetObjectReferences"); + ZEN_TRACE_CPU("JupiterClient::GetObjectReferences"); ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString() << "/references"; @@ -951,107 +951,6 @@ CloudCacheSession::ObjectExists(std::string_view Namespace, const std::set<IoHas return CacheTypeExists(Namespace, "objects"sv, Keys); } -CloudCacheResult -CloudCacheSession::PostComputeTasks(IoBuffer TasksData) -{ - ZEN_TRACE_CPU("HordeClient::PostComputeTasks"); - - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << m_CacheClient->ComputeCluster(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}}); - Session.SetBody(cpr::Body{(const char*)TasksData.Data(), TasksData.Size()}); - - cpr::Response Response = Session.Post(); - ZEN_DEBUG("POST {}", Response); - - CloudCacheResult Result = detail::ConvertResponse(Response); - if (!Result.Success) - { - ZEN_WARN( - "CloudCacheSession::PostComputeTasks failed GET. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-Accept: '{}', " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - "application/x-ue-cb", - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - return Result; -} - -CloudCacheResult -CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds) -{ - ZEN_TRACE_CPU("HordeClient::GetComputeUpdates"); - - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << m_CacheClient->ComputeCluster() << "/updates/" << ChannelId - << "?wait=" << WaitSeconds; - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}}); - Session.SetOption(cpr::Body{}); - - cpr::Response Response = Session.Post(); - ZEN_DEBUG("POST {}", Response); - - CloudCacheResult Result = detail::ConvertResponse(Response); - - if (Result.Success) - { - Result.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()); - } - else - { - ZEN_WARN( - "CloudCacheSession::GetComputeUpdates failed GET. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-Accept: '{}', " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - "application/x-ue-cb", - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - - return Result; -} - std::vector<IoHash> CloudCacheSession::Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes) { @@ -1079,7 +978,7 @@ CloudCacheSession::GetAccessToken(bool RefreshToken) CloudCacheResult CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key) { - ZEN_TRACE_CPU("HordeClient::CacheTypeExists"); + ZEN_TRACE_CPU("JupiterClient::CacheTypeExists"); ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << Namespace << "/" << Key.ToHexString(); @@ -1126,7 +1025,7 @@ CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view CloudCacheExistsResult CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys) { - ZEN_TRACE_CPU("HordeClient::CacheTypeExists"); + ZEN_TRACE_CPU("JupiterClient::CacheTypeExists"); ExtendableStringBuilder<256> Body; Body << "["; @@ -1311,7 +1210,7 @@ CloudCacheClient::~CloudCacheClient() CloudCacheAccessToken CloudCacheClient::AcquireAccessToken() { - ZEN_TRACE_CPU("HordeClient::AcquireAccessToken"); + ZEN_TRACE_CPU("JupiterClient::AcquireAccessToken"); return m_TokenProvider->AcquireAccessToken(); } diff --git a/src/zenserver/upstream/jupiter.h b/src/zenserver/upstream/jupiter.h index 4e7d11fd9..ca84905d2 100644 --- a/src/zenserver/upstream/jupiter.h +++ b/src/zenserver/upstream/jupiter.h @@ -126,9 +126,6 @@ public: CloudCacheExistsResult CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys); CloudCacheExistsResult ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys); - CloudCacheResult PostComputeTasks(IoBuffer TasksData); - CloudCacheResult GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds = 0); - std::vector<IoHash> Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes); CloudCacheClient& Client() { return *m_CacheClient; }; diff --git a/src/zenserver/upstream/upstreamapply.cpp b/src/zenserver/upstream/upstreamapply.cpp deleted file mode 100644 index 3d29f2228..000000000 --- a/src/zenserver/upstream/upstreamapply.cpp +++ /dev/null @@ -1,459 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "upstreamapply.h" - -#if ZEN_WITH_COMPUTE_SERVICES - -# include <zencore/compactbinary.h> -# include <zencore/compactbinarybuilder.h> -# include <zencore/fmtutils.h> -# include <zencore/stream.h> -# include <zencore/timer.h> -# include <zencore/workthreadpool.h> - -# include <zenstore/cidstore.h> - -# include "diag/logging.h" - -# include <fmt/format.h> - -# include <atomic> - -namespace zen { - -using namespace std::literals; - -struct UpstreamApplyStats -{ - static constexpr uint64_t MaxSampleCount = 1000ull; - - UpstreamApplyStats(bool Enabled) : m_Enabled(Enabled) {} - - void Add(UpstreamApplyEndpoint& Endpoint, const PostUpstreamApplyResult& Result) - { - UpstreamApplyEndpointStats& Stats = Endpoint.Stats(); - - if (Result.Error) - { - Stats.ErrorCount.Increment(1); - } - else if (Result.Success) - { - Stats.PostCount.Increment(1); - Stats.UpBytes.Increment(Result.Bytes / 1024 / 1024); - } - } - - void Add(UpstreamApplyEndpoint& Endpoint, const GetUpstreamApplyUpdatesResult& Result) - { - UpstreamApplyEndpointStats& Stats = Endpoint.Stats(); - - if (Result.Error) - { - Stats.ErrorCount.Increment(1); - } - else if (Result.Success) - { - Stats.UpdateCount.Increment(1); - Stats.DownBytes.Increment(Result.Bytes / 1024 / 1024); - if (!Result.Completed.empty()) - { - uint64_t Completed = 0; - for (auto& It : Result.Completed) - { - Completed += It.second.size(); - } - Stats.CompleteCount.Increment(Completed); - } - } - } - - bool m_Enabled; -}; - -////////////////////////////////////////////////////////////////////////// - -class UpstreamApplyImpl final : public UpstreamApply -{ -public: - UpstreamApplyImpl(const UpstreamApplyOptions& Options, CidStore& CidStore) - : m_Log(logging::Get("upstream-apply")) - , m_Options(Options) - , m_CidStore(CidStore) - , m_Stats(Options.StatsEnabled) - , m_UpstreamAsyncWorkPool(Options.UpstreamThreadCount) - , m_DownstreamAsyncWorkPool(Options.DownstreamThreadCount) - { - } - - virtual ~UpstreamApplyImpl() { Shutdown(); } - - virtual bool Initialize() override - { - for (auto& Endpoint : m_Endpoints) - { - const UpstreamEndpointHealth Health = Endpoint->Initialize(); - if (Health.Ok) - { - Log().info("initialize endpoint '{}' OK", Endpoint->DisplayName()); - } - else - { - Log().warn("initialize endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason); - } - } - - m_RunState.IsRunning = !m_Endpoints.empty(); - - if (m_RunState.IsRunning) - { - m_ShutdownEvent.Reset(); - - m_UpstreamUpdatesThread = std::thread(&UpstreamApplyImpl::ProcessUpstreamUpdates, this); - - m_EndpointMonitorThread = std::thread(&UpstreamApplyImpl::MonitorEndpoints, this); - } - - return m_RunState.IsRunning; - } - - virtual bool IsHealthy() const override - { - if (m_RunState.IsRunning) - { - for (const auto& Endpoint : m_Endpoints) - { - if (Endpoint->IsHealthy()) - { - return true; - } - } - } - - return false; - } - - virtual void RegisterEndpoint(std::unique_ptr<UpstreamApplyEndpoint> Endpoint) override - { - m_Endpoints.emplace_back(std::move(Endpoint)); - } - - virtual EnqueueResult EnqueueUpstream(UpstreamApplyRecord ApplyRecord) override - { - if (m_RunState.IsRunning) - { - const IoHash WorkerId = ApplyRecord.WorkerDescriptor.GetHash(); - const IoHash ActionId = ApplyRecord.Action.GetHash(); - const uint32_t TimeoutSeconds = ApplyRecord.WorkerDescriptor["timeout"sv].AsInt32(300); - - { - std::scoped_lock Lock(m_ApplyTasksMutex); - if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) - { - // Already in progress - return {.ApplyId = ActionId, .Success = true}; - } - - std::chrono::steady_clock::time_point ExpireTime = - TimeoutSeconds > 0 ? std::chrono::steady_clock::now() + std::chrono::seconds(TimeoutSeconds) - : std::chrono::steady_clock::time_point::max(); - - m_ApplyTasks[WorkerId][ActionId] = {.State = UpstreamApplyState::Queued, .Result{}, .ExpireTime = std::move(ExpireTime)}; - } - - ApplyRecord.Timepoints["zen-queue-added"] = DateTime::NowTicks(); - m_UpstreamAsyncWorkPool.ScheduleWork( - [this, ApplyRecord = std::move(ApplyRecord)]() { ProcessApplyRecord(std::move(ApplyRecord)); }); - - return {.ApplyId = ActionId, .Success = true}; - } - - return {}; - } - - virtual StatusResult GetStatus(const IoHash& WorkerId, const IoHash& ActionId) override - { - if (m_RunState.IsRunning) - { - std::scoped_lock Lock(m_ApplyTasksMutex); - if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) - { - return {.Status = *Status, .Success = true}; - } - } - - return {}; - } - - virtual void GetStatus(CbObjectWriter& Status) override - { - Status << "upstream_worker_threads" << m_Options.UpstreamThreadCount; - Status << "upstream_queue_count" << m_UpstreamAsyncWorkPool.PendingWorkItemCount(); - Status << "downstream_worker_threads" << m_Options.DownstreamThreadCount; - Status << "downstream_queue_count" << m_DownstreamAsyncWorkPool.PendingWorkItemCount(); - - Status.BeginArray("endpoints"); - for (const auto& Ep : m_Endpoints) - { - Status.BeginObject(); - Status << "name" << Ep->DisplayName(); - Status << "health" << (Ep->IsHealthy() ? "ok"sv : "inactive"sv); - - UpstreamApplyEndpointStats& Stats = Ep->Stats(); - const uint64_t PostCount = Stats.PostCount.Value(); - const uint64_t CompleteCount = Stats.CompleteCount.Value(); - // const uint64_t UpdateCount = Stats.UpdateCount; - const double CompleteRate = CompleteCount > 0 ? (double(PostCount) / double(CompleteCount)) : 0.0; - - Status << "post_count" << PostCount; - Status << "complete_count" << PostCount; - Status << "update_count" << Stats.UpdateCount.Value(); - - Status << "complete_ratio" << CompleteRate; - Status << "downloaded_mb" << Stats.DownBytes.Value(); - Status << "uploaded_mb" << Stats.UpBytes.Value(); - Status << "error_count" << Stats.ErrorCount.Value(); - - Status.EndObject(); - } - Status.EndArray(); - } - -private: - // The caller is responsible for locking if required - UpstreamApplyStatus* FindStatus(const IoHash& WorkerId, const IoHash& ActionId) - { - if (auto It = m_ApplyTasks.find(WorkerId); It != m_ApplyTasks.end()) - { - if (auto It2 = It->second.find(ActionId); It2 != It->second.end()) - { - return &It2->second; - } - } - return nullptr; - } - - void ProcessApplyRecord(UpstreamApplyRecord ApplyRecord) - { - const IoHash WorkerId = ApplyRecord.WorkerDescriptor.GetHash(); - const IoHash ActionId = ApplyRecord.Action.GetHash(); - try - { - for (auto& Endpoint : m_Endpoints) - { - if (Endpoint->IsHealthy()) - { - ApplyRecord.Timepoints["zen-queue-dispatched"] = DateTime::NowTicks(); - PostUpstreamApplyResult Result = Endpoint->PostApply(std::move(ApplyRecord)); - { - std::scoped_lock Lock(m_ApplyTasksMutex); - if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) - { - Status->Timepoints.merge(Result.Timepoints); - - if (Result.Success) - { - Status->State = UpstreamApplyState::Executing; - } - else - { - Status->State = UpstreamApplyState::Complete; - Status->Result = {.Error = std::move(Result.Error), - .Bytes = Result.Bytes, - .ElapsedSeconds = Result.ElapsedSeconds}; - } - } - } - m_Stats.Add(*Endpoint, Result); - return; - } - } - - Log().warn("process upstream apply ({}/{}) FAILED 'No available endpoint'", WorkerId, ActionId); - - { - std::scoped_lock Lock(m_ApplyTasksMutex); - if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) - { - Status->State = UpstreamApplyState::Complete; - Status->Result = {.Error{.ErrorCode = -1, .Reason = "No available endpoint"}}; - } - } - } - catch (std::exception& e) - { - std::scoped_lock Lock(m_ApplyTasksMutex); - if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) - { - Status->State = UpstreamApplyState::Complete; - Status->Result = {.Error{.ErrorCode = -1, .Reason = e.what()}}; - } - Log().warn("process upstream apply ({}/{}) FAILED '{}'", WorkerId, ActionId, e.what()); - } - } - - void ProcessApplyUpdates() - { - for (auto& Endpoint : m_Endpoints) - { - if (Endpoint->IsHealthy()) - { - GetUpstreamApplyUpdatesResult Result = Endpoint->GetUpdates(m_DownstreamAsyncWorkPool); - m_Stats.Add(*Endpoint, Result); - - if (!Result.Success) - { - Log().warn("process upstream apply updates FAILED '{}'", Result.Error.Reason); - } - - if (!Result.Completed.empty()) - { - for (auto& It : Result.Completed) - { - for (auto& It2 : It.second) - { - std::scoped_lock Lock(m_ApplyTasksMutex); - if (auto Status = FindStatus(It.first, It2.first); Status != nullptr) - { - Status->State = UpstreamApplyState::Complete; - Status->Result = std::move(It2.second); - Status->Result.Timepoints.merge(Status->Timepoints); - Status->Result.Timepoints["zen-queue-complete"] = DateTime::NowTicks(); - Status->Timepoints.clear(); - } - } - } - } - } - } - } - - void ProcessUpstreamUpdates() - { - const auto& UpdateSleep = std::chrono::milliseconds(m_Options.UpdatesInterval); - while (!m_ShutdownEvent.Wait(uint32_t(UpdateSleep.count()))) - { - if (!m_RunState.IsRunning) - { - break; - } - - ProcessApplyUpdates(); - - // Remove any expired tasks, regardless of state - { - std::scoped_lock Lock(m_ApplyTasksMutex); - for (auto& WorkerIt : m_ApplyTasks) - { - const auto Count = std::erase_if(WorkerIt.second, [](const auto& Item) { - return Item.second.ExpireTime < std::chrono::steady_clock::now(); - }); - if (Count > 0) - { - Log().debug("Removed '{}' expired tasks", Count); - } - } - const auto Count = std::erase_if(m_ApplyTasks, [](const auto& Item) { return Item.second.empty(); }); - if (Count > 0) - { - Log().debug("Removed '{}' empty task lists", Count); - } - } - } - } - - void MonitorEndpoints() - { - for (;;) - { - { - std::unique_lock Lock(m_RunState.Mutex); - if (m_RunState.ExitSignal.wait_for(Lock, m_Options.HealthCheckInterval, [this]() { return !m_RunState.IsRunning.load(); })) - { - break; - } - } - - for (auto& Endpoint : m_Endpoints) - { - if (!Endpoint->IsHealthy()) - { - if (const UpstreamEndpointHealth Health = Endpoint->CheckHealth(); Health.Ok) - { - Log().warn("health check endpoint '{}' OK", Endpoint->DisplayName(), Health.Reason); - } - else - { - Log().warn("health check endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason); - } - } - } - } - } - - void Shutdown() - { - if (m_RunState.Stop()) - { - m_ShutdownEvent.Set(); - m_EndpointMonitorThread.join(); - m_UpstreamUpdatesThread.join(); - m_Endpoints.clear(); - } - } - - spdlog::logger& Log() { return m_Log; } - - struct RunState - { - std::mutex Mutex; - std::condition_variable ExitSignal; - std::atomic_bool IsRunning{false}; - - bool Stop() - { - bool Stopped = false; - { - std::scoped_lock Lock(Mutex); - Stopped = IsRunning.exchange(false); - } - if (Stopped) - { - ExitSignal.notify_all(); - } - return Stopped; - } - }; - - spdlog::logger& m_Log; - UpstreamApplyOptions m_Options; - CidStore& m_CidStore; - UpstreamApplyStats m_Stats; - UpstreamApplyTasks m_ApplyTasks; - std::mutex m_ApplyTasksMutex; - std::vector<std::unique_ptr<UpstreamApplyEndpoint>> m_Endpoints; - Event m_ShutdownEvent; - WorkerThreadPool m_UpstreamAsyncWorkPool; - WorkerThreadPool m_DownstreamAsyncWorkPool; - std::thread m_UpstreamUpdatesThread; - std::thread m_EndpointMonitorThread; - RunState m_RunState; -}; - -////////////////////////////////////////////////////////////////////////// - -bool -UpstreamApply::IsHealthy() const -{ - return false; -} - -std::unique_ptr<UpstreamApply> -UpstreamApply::Create(const UpstreamApplyOptions& Options, CidStore& CidStore) -{ - return std::make_unique<UpstreamApplyImpl>(Options, CidStore); -} - -} // namespace zen - -#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zenserver/upstream/upstreamapply.h b/src/zenserver/upstream/upstreamapply.h deleted file mode 100644 index 4a095be6c..000000000 --- a/src/zenserver/upstream/upstreamapply.h +++ /dev/null @@ -1,192 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#if ZEN_WITH_COMPUTE_SERVICES - -# include <zencore/compactbinarypackage.h> -# include <zencore/iobuffer.h> -# include <zencore/iohash.h> -# include <zencore/stats.h> -# include <zencore/zencore.h> - -# include <chrono> -# include <map> -# include <unordered_map> -# include <unordered_set> - -namespace zen { - -class AuthMgr; -class CbObjectWriter; -class CidStore; -class CloudCacheTokenProvider; -class WorkerThreadPool; -class ZenCacheNamespace; -struct CloudCacheClientOptions; -struct UpstreamAuthConfig; - -enum class UpstreamApplyState : int32_t -{ - Queued = 0, - Executing = 1, - Complete = 2, -}; - -enum class UpstreamApplyType -{ - Simple = 0, - Asset = 1, -}; - -struct UpstreamApplyRecord -{ - CbObject WorkerDescriptor; - CbObject Action; - UpstreamApplyType Type; - std::map<std::string, uint64_t> Timepoints{}; -}; - -struct UpstreamApplyOptions -{ - std::chrono::seconds HealthCheckInterval{5}; - std::chrono::seconds UpdatesInterval{5}; - uint32_t UpstreamThreadCount = 4; - uint32_t DownstreamThreadCount = 4; - bool StatsEnabled = false; -}; - -struct UpstreamApplyError -{ - int32_t ErrorCode{}; - std::string Reason{}; - - explicit operator bool() const { return ErrorCode != 0; } -}; - -struct PostUpstreamApplyResult -{ - UpstreamApplyError Error{}; - int64_t Bytes{}; - double ElapsedSeconds{}; - std::map<std::string, uint64_t> Timepoints{}; - bool Success = false; -}; - -struct GetUpstreamApplyResult -{ - // UpstreamApplyType::Simple - std::map<std::filesystem::path, IoHash> OutputFiles{}; - std::map<IoHash, IoBuffer> FileData{}; - - // UpstreamApplyType::Asset - CbPackage OutputPackage{}; - int64_t TotalAttachmentBytes{}; - int64_t TotalRawAttachmentBytes{}; - - UpstreamApplyError Error{}; - int64_t Bytes{}; - double ElapsedSeconds{}; - std::string StdOut{}; - std::string StdErr{}; - std::string Agent{}; - std::string Detail{}; - std::map<std::string, uint64_t> Timepoints{}; - bool Success = false; -}; - -using UpstreamApplyCompleted = std::unordered_map<IoHash, std::unordered_map<IoHash, GetUpstreamApplyResult>>; - -struct GetUpstreamApplyUpdatesResult -{ - UpstreamApplyError Error{}; - int64_t Bytes{}; - double ElapsedSeconds{}; - UpstreamApplyCompleted Completed{}; - bool Success = false; -}; - -struct UpstreamApplyStatus -{ - UpstreamApplyState State{}; - GetUpstreamApplyResult Result{}; - std::chrono::steady_clock::time_point ExpireTime{}; - std::map<std::string, uint64_t> Timepoints{}; -}; - -using UpstreamApplyTasks = std::unordered_map<IoHash, std::unordered_map<IoHash, UpstreamApplyStatus>>; - -struct UpstreamEndpointHealth -{ - std::string Reason; - bool Ok = false; -}; - -struct UpstreamApplyEndpointStats -{ - metrics::Counter PostCount; - metrics::Counter CompleteCount; - metrics::Counter UpdateCount; - metrics::Counter ErrorCount; - metrics::Counter UpBytes; - metrics::Counter DownBytes; -}; - -/** - * The upstream apply endpoint is responsible for handling remote execution. - */ -class UpstreamApplyEndpoint -{ -public: - virtual ~UpstreamApplyEndpoint() = default; - - virtual UpstreamEndpointHealth Initialize() = 0; - virtual bool IsHealthy() const = 0; - virtual UpstreamEndpointHealth CheckHealth() = 0; - virtual std::string_view DisplayName() const = 0; - virtual PostUpstreamApplyResult PostApply(UpstreamApplyRecord ApplyRecord) = 0; - virtual GetUpstreamApplyUpdatesResult GetUpdates(WorkerThreadPool& ThreadPool) = 0; - virtual UpstreamApplyEndpointStats& Stats() = 0; - - static std::unique_ptr<UpstreamApplyEndpoint> CreateHordeEndpoint(const CloudCacheClientOptions& ComputeOptions, - const UpstreamAuthConfig& ComputeAuthConfig, - const CloudCacheClientOptions& StorageOptions, - const UpstreamAuthConfig& StorageAuthConfig, - CidStore& CidStore, - AuthMgr& Mgr); -}; - -/** - * Manages one or more upstream compute endpoints. - */ -class UpstreamApply -{ -public: - virtual ~UpstreamApply() = default; - - virtual bool Initialize() = 0; - virtual bool IsHealthy() const = 0; - virtual void RegisterEndpoint(std::unique_ptr<UpstreamApplyEndpoint> Endpoint) = 0; - - struct EnqueueResult - { - IoHash ApplyId{}; - bool Success = false; - }; - - struct StatusResult - { - UpstreamApplyStatus Status{}; - bool Success = false; - }; - - virtual EnqueueResult EnqueueUpstream(UpstreamApplyRecord ApplyRecord) = 0; - virtual StatusResult GetStatus(const IoHash& WorkerId, const IoHash& ActionId) = 0; - virtual void GetStatus(CbObjectWriter& CbO) = 0; - - static std::unique_ptr<UpstreamApply> Create(const UpstreamApplyOptions& Options, CidStore& CidStore); -}; - -} // namespace zen - -#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zenserver/upstream/upstreamcache.cpp b/src/zenserver/upstream/upstreamcache.cpp index ac647130e..5f711d0bc 100644 --- a/src/zenserver/upstream/upstreamcache.cpp +++ b/src/zenserver/upstream/upstreamcache.cpp @@ -498,7 +498,7 @@ namespace detail { } else { - ZEN_WARN("Horde request for inline payload of {}/{}/{} has hash {}, expected hash {} from header", + ZEN_WARN("Jupiter request for inline payload of {}/{}/{} has hash {}, expected hash {} from header", Namespace, Request.Key.Bucket, Request.Key.Hash.ToHexString(), diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp index 1f5c91a58..789952a3c 100644 --- a/src/zenserver/zenserver.cpp +++ b/src/zenserver/zenserver.cpp @@ -217,17 +217,6 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen m_ProjectStore = new ProjectStore(*m_CidStore, m_DataRoot / "projects", m_GcManager, *m_JobQueue); m_HttpProjectService.reset(new HttpProjectService{*m_CidStore, m_ProjectStore, m_StatsService, *m_AuthMgr}); -#if ZEN_WITH_COMPUTE_SERVICES - if (ServerOptions.ComputeServiceEnabled) - { - InitializeCompute(ServerOptions); - } - else - { - ZEN_INFO("NOT instantiating compute services"); - } -#endif // ZEN_WITH_COMPUTE_SERVICES - if (ServerOptions.StructuredCacheConfig.Enabled) { InitializeStructuredCache(ServerOptions); @@ -250,16 +239,6 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen m_Http->RegisterService(*m_CidService); -#if ZEN_WITH_COMPUTE_SERVICES - if (ServerOptions.ComputeServiceEnabled) - { - if (m_HttpFunctionService != nullptr) - { - m_Http->RegisterService(*m_HttpFunctionService); - } - } -#endif // ZEN_WITH_COMPUTE_SERVICES - m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot); if (m_FrontendService) @@ -532,60 +511,6 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions) m_Http->RegisterService(*m_UpstreamService); } -#if ZEN_WITH_COMPUTE_SERVICES -void -ZenServer::InitializeCompute(const ZenServerOptions& ServerOptions) -{ - ServerOptions; - const ZenUpstreamCacheConfig& UpstreamConfig = ServerOptions.UpstreamCacheConfig; - - // Horde compute upstream - if (UpstreamConfig.HordeConfig.Url.empty() == false && UpstreamConfig.HordeConfig.StorageUrl.empty() == false) - { - ZEN_INFO("instantiating compute service"); - - std::string_view EndpointName = UpstreamConfig.HordeConfig.Name.empty() ? "Horde"sv : UpstreamConfig.HordeConfig.Name; - - auto ComputeOptions = - CloudCacheClientOptions{.Name = EndpointName, - .ServiceUrl = UpstreamConfig.HordeConfig.Url, - .ComputeCluster = UpstreamConfig.HordeConfig.Cluster, - .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds), - .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)}; - - auto ComputeAuthConfig = UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.HordeConfig.OAuthUrl, - .OAuthClientId = UpstreamConfig.HordeConfig.OAuthClientId, - .OAuthClientSecret = UpstreamConfig.HordeConfig.OAuthClientSecret, - .OpenIdProvider = UpstreamConfig.HordeConfig.OpenIdProvider, - .AccessToken = UpstreamConfig.HordeConfig.AccessToken}; - - auto StorageOptions = - CloudCacheClientOptions{.Name = EndpointName, - .ServiceUrl = UpstreamConfig.HordeConfig.StorageUrl, - .BlobStoreNamespace = UpstreamConfig.HordeConfig.Namespace, - .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds), - .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)}; - - auto StorageAuthConfig = UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.HordeConfig.StorageOAuthUrl, - .OAuthClientId = UpstreamConfig.HordeConfig.StorageOAuthClientId, - .OAuthClientSecret = UpstreamConfig.HordeConfig.StorageOAuthClientSecret, - .OpenIdProvider = UpstreamConfig.HordeConfig.StorageOpenIdProvider, - .AccessToken = UpstreamConfig.HordeConfig.StorageAccessToken}; - - m_HttpFunctionService = std::make_unique<HttpFunctionService>(*m_CidStore, - ComputeOptions, - StorageOptions, - ComputeAuthConfig, - StorageAuthConfig, - *m_AuthMgr); - } - else - { - ZEN_INFO("NOT instantiating compute service (missing Horde or Storage config)"); - } -} -#endif // ZEN_WITH_COMPUTE_SERVICES - void ZenServer::Run() { @@ -679,10 +604,6 @@ ZenServer::Cleanup() m_UpstreamCache.reset(); m_CacheStore = {}; -#if ZEN_WITH_COMPUTE_SERVICES - m_HttpFunctionService.reset(); -#endif // ZEN_WITH_COMPUTE_SERVICES - m_HttpProjectService.reset(); m_ProjectStore = {}; m_CidService.reset(); diff --git a/src/zenserver/zenserver.h b/src/zenserver/zenserver.h index 2b1ae3842..0c28c1229 100644 --- a/src/zenserver/zenserver.h +++ b/src/zenserver/zenserver.h @@ -29,7 +29,6 @@ ZEN_THIRD_PARTY_INCLUDES_END #include "admin/admin.h" #include "cache/httpstructuredcache.h" #include "cache/structuredcachestore.h" -#include "compute/function.h" #include "frontend/frontend.h" #include "httpcidstore.h" #include "objectstore/objectstore.h" @@ -131,14 +130,11 @@ private: std::unique_ptr<HttpUpstreamService> m_UpstreamService; std::unique_ptr<HttpStructuredCacheService> m_StructuredCacheService; HttpHealthService m_HealthService; -#if ZEN_WITH_COMPUTE_SERVICES - std::unique_ptr<HttpFunctionService> m_HttpFunctionService; -#endif - std::unique_ptr<HttpFrontendService> m_FrontendService; - std::unique_ptr<HttpObjectStoreService> m_ObjStoreService; - std::unique_ptr<VfsService> m_VfsService; - std::unique_ptr<JobQueue> m_JobQueue; - std::unique_ptr<HttpAdminService> m_AdminService; + std::unique_ptr<HttpFrontendService> m_FrontendService; + std::unique_ptr<HttpObjectStoreService> m_ObjStoreService; + std::unique_ptr<VfsService> m_VfsService; + std::unique_ptr<JobQueue> m_JobQueue; + std::unique_ptr<HttpAdminService> m_AdminService; bool m_DebugOptionForcedCrash = false; bool m_UseSentry = false; |