diff options
| author | Per Larsson <[email protected]> | 2021-12-09 17:01:57 +0100 |
|---|---|---|
| committer | Per Larsson <[email protected]> | 2021-12-09 17:01:57 +0100 |
| commit | 20f3c16b0012cfb8ce7bf9b6dd06a2720b6885c6 (patch) | |
| tree | fbb0f274840a12c32d93c7e342c0427f2617a651 /zenserver/upstream | |
| parent | Disabled cache tracker. (diff) | |
| parent | Return status_code as ErrorCode from jupiter api if not successful (diff) | |
| download | zen-20f3c16b0012cfb8ce7bf9b6dd06a2720b6885c6.tar.xz zen-20f3c16b0012cfb8ce7bf9b6dd06a2720b6885c6.zip | |
Merged main.
Diffstat (limited to 'zenserver/upstream')
| -rw-r--r-- | zenserver/upstream/jupiter.cpp | 72 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamapply.cpp | 94 |
2 files changed, 112 insertions, 54 deletions
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index 1f82f4a04..f9be068ec 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -116,7 +116,11 @@ CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Ke const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); - return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; + return {.Response = Buffer, + .Bytes = Response.downloaded_bytes, + .ElapsedSeconds = Response.elapsed, + .ErrorCode = !Success ? Response.status_code : 0, + .Success = Success}; } CloudCacheResult @@ -161,7 +165,11 @@ CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenConte const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); - return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; + return {.Response = Buffer, + .Bytes = Response.downloaded_bytes, + .ElapsedSeconds = Response.elapsed, + .ErrorCode = !Success ? Response.status_code : 0, + .Success = Success}; } CloudCacheResult @@ -198,7 +206,11 @@ CloudCacheSession::GetBlob(const IoHash& Key) const IoBuffer Buffer = Success && Response.text.size() > 0 ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); - return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; + return {.Response = Buffer, + .Bytes = Response.downloaded_bytes, + .ElapsedSeconds = Response.elapsed, + .ErrorCode = !Success ? Response.status_code : 0, + .Success = Success}; } CloudCacheResult @@ -234,7 +246,11 @@ CloudCacheSession::GetCompressedBlob(const IoHash& Key) const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); - return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; + return {.Response = Buffer, + .Bytes = Response.downloaded_bytes, + .ElapsedSeconds = Response.elapsed, + .ErrorCode = !Success ? Response.status_code : 0, + .Success = Success}; } CloudCacheResult @@ -270,7 +286,11 @@ CloudCacheSession::GetObject(const IoHash& Key) const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); - return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; + return {.Response = Buffer, + .Bytes = Response.downloaded_bytes, + .ElapsedSeconds = Response.elapsed, + .ErrorCode = !Success ? Response.status_code : 0, + .Success = Success}; } CloudCacheResult @@ -307,9 +327,12 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Ke return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } + const bool Success = Response.status_code == 200 || Response.status_code == 201; + return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, - .Success = (Response.status_code == 200 || Response.status_code == 201)}; + .ErrorCode = !Success ? Response.status_code : 0, + .Success = Success}; } CloudCacheResult @@ -365,6 +388,7 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer PutRefResult Result; Result.Success = (Response.status_code == 200 || Response.status_code == 201); + Result.ErrorCode = !Result.Success ? Response.status_code : 0; Result.Bytes = Response.uploaded_bytes; Result.ElapsedSeconds = Response.elapsed; @@ -429,6 +453,7 @@ CloudCacheSession::FinalizeRef(std::string_view BucketId, const IoHash& Key, con FinalizeRefResult Result; Result.Success = (Response.status_code == 200 || Response.status_code == 201); + Result.ErrorCode = !Result.Success ? Response.status_code : 0; Result.Bytes = Response.uploaded_bytes; Result.ElapsedSeconds = Response.elapsed; @@ -479,9 +504,12 @@ CloudCacheSession::PutBlob(const IoHash& Key, IoBuffer Blob) return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } + const bool Success = Response.status_code == 200 || Response.status_code == 201; + return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, - .Success = (Response.status_code == 200 || Response.status_code == 201)}; + .ErrorCode = !Success ? Response.status_code : 0, + .Success = Success}; } CloudCacheResult @@ -514,9 +542,12 @@ CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob) return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } + const bool Success = Response.status_code == 200 || Response.status_code == 201; + return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, - .Success = (Response.status_code == 200 || Response.status_code == 201)}; + .ErrorCode = !Success ? Response.status_code : 0, + .Success = Success}; } CloudCacheResult @@ -549,9 +580,12 @@ CloudCacheSession::PutObject(const IoHash& Key, IoBuffer Object) return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } + const bool Success = Response.status_code == 200 || Response.status_code == 201; + return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, - .Success = (Response.status_code == 200 || Response.status_code == 201)}; + .ErrorCode = !Success ? Response.status_code : 0, + .Success = Success}; } CloudCacheResult @@ -585,7 +619,9 @@ CloudCacheSession::RefExists(std::string_view BucketId, const IoHash& Key) return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } - return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; + const bool Success = Response.status_code == 200; + + return {.ElapsedSeconds = Response.elapsed, .ErrorCode = !Success ? Response.status_code : 0, .Success = Success}; } CloudCacheResult @@ -654,7 +690,9 @@ CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksDa return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } - return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; + const bool Success = Response.status_code == 200; + + return {.ElapsedSeconds = Response.elapsed, .ErrorCode = !Success ? Response.status_code : 0, .Success = Success}; } CloudCacheResult @@ -690,7 +728,11 @@ CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); - return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; + return {.Response = Buffer, + .Bytes = Response.downloaded_bytes, + .ElapsedSeconds = Response.elapsed, + .ErrorCode = !Success ? Response.status_code : 0, + .Success = Success}; } CloudCacheResult @@ -726,7 +768,11 @@ CloudCacheSession::GetObjectTree(const IoHash& Key) const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); - return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; + return {.Response = Buffer, + .Bytes = Response.downloaded_bytes, + .ElapsedSeconds = Response.elapsed, + .ErrorCode = !Success ? Response.status_code : 0, + .Success = Success}; } std::vector<IoHash> diff --git a/zenserver/upstream/upstreamapply.cpp b/zenserver/upstream/upstreamapply.cpp index 0118902a6..05be5f65c 100644 --- a/zenserver/upstream/upstreamapply.cpp +++ b/zenserver/upstream/upstreamapply.cpp @@ -14,6 +14,7 @@ #include <zencore/session.h> #include <zencore/stats.h> #include <zencore/stream.h> +#include <zencore/thread.h> #include <zencore/timer.h> #include <zenstore/cas.h> @@ -36,6 +37,9 @@ namespace zen { using namespace std::literals; +static const IoBuffer EmptyBuffer; +static const IoHash EmptyBufferId = IoHash::HashBuffer(EmptyBuffer); + namespace detail { class HordeUpstreamApplyEndpoint final : public UpstreamApplyEndpoint @@ -108,7 +112,8 @@ namespace detail { ElapsedSeconds += Result.ElapsedSeconds; if (!Result.Success) { - return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}, + return {.Error{.ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1, + .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to upload blobs"}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds}; } @@ -121,7 +126,8 @@ namespace detail { ElapsedSeconds += Result.ElapsedSeconds; if (!Result.Success) { - return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}, + return {.Error{.ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1, + .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to upload objects"}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds}; } @@ -145,7 +151,8 @@ namespace detail { m_PendingTasks.erase(UpstreamData.TaskId); } - return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}, + return {.Error{.ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1, + .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to post compute task"}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds}; } @@ -178,12 +185,12 @@ namespace detail { CloudCacheExistsResult ExistsResult = Session.BlobExists(Keys); ElapsedSeconds += ExistsResult.ElapsedSeconds; - if (ExistsResult.ErrorCode != 0) + if (!ExistsResult.Success) { return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, - .ErrorCode = ExistsResult.ErrorCode, - .Reason = std::move(ExistsResult.Reason)}; + .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1, + .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to check if blobs exist"}; } // TODO: Batch upload missing blobs @@ -202,8 +209,8 @@ namespace detail { { return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, - .ErrorCode = Result.ErrorCode, - .Reason = std::move(Result.Reason)}; + .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1, + .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to put blobs"}; } } @@ -229,12 +236,12 @@ namespace detail { CloudCacheExistsResult ExistsResult = Session.ObjectExists(Keys); ElapsedSeconds += ExistsResult.ElapsedSeconds; - if (ExistsResult.ErrorCode != 0) + if (!ExistsResult.Success) { return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, - .ErrorCode = ExistsResult.ErrorCode, - .Reason = std::move(ExistsResult.Reason)}; + .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1, + .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to check if objects exist"}; } // TODO: Batch upload missing objects @@ -253,8 +260,8 @@ namespace detail { { return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, - .ErrorCode = Result.ErrorCode, - .Reason = std::move(Result.Reason)}; + .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1, + .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to put objects"}; } } @@ -323,7 +330,7 @@ namespace detail { CloudCacheResult UpdatesResult = Session.GetComputeUpdates(m_ChannelId); Bytes += UpdatesResult.Bytes; ElapsedSeconds += UpdatesResult.ElapsedSeconds; - if (UpdatesResult.ErrorCode != 0) + if (!UpdatesResult.Success) { return {.Error{.ErrorCode = UpdatesResult.ErrorCode, .Reason = std::move(UpdatesResult.Reason)}, .Bytes = Bytes, @@ -456,13 +463,6 @@ namespace detail { Bytes += ObjectTreeResult.Bytes; ElapsedSeconds += ObjectTreeResult.ElapsedSeconds; - if (ObjectTreeResult.ErrorCode != 0) - { - return {.Error{.ErrorCode = ObjectTreeResult.ErrorCode, .Reason = std::move(ObjectTreeResult.Reason)}, - .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds}; - } - if (!ObjectTreeResult.Success) { return {.Error{.ErrorCode = -1, .Reason = "Failed to get result object data"}, @@ -509,7 +509,7 @@ namespace detail { CloudCacheResult ObjectResult = Session.GetObject(It.first); Bytes += ObjectTreeResult.Bytes; ElapsedSeconds += ObjectTreeResult.ElapsedSeconds; - if (ObjectTreeResult.ErrorCode != 0) + if (!ObjectTreeResult.Success) { return {.Error{.ErrorCode = ObjectResult.ErrorCode, .Reason = std::move(ObjectResult.Reason)}, .Bytes = Bytes, @@ -533,7 +533,7 @@ namespace detail { CloudCacheResult BlobResult = Session.GetBlob(It.first); Bytes += ObjectTreeResult.Bytes; ElapsedSeconds += ObjectTreeResult.ElapsedSeconds; - if (BlobResult.ErrorCode != 0) + if (!BlobResult.Success) { return {.Error{.ErrorCode = BlobResult.ErrorCode, .Reason = std::move(BlobResult.Reason)}, .Bytes = Bytes, @@ -588,7 +588,9 @@ namespace detail { { return {.Error{.ErrorCode = ExitCode, .Reason = "Build.output file not found in task results"}, .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds}; + .ElapsedSeconds = ElapsedSeconds, + .StdOut = std::move(StdOut), + .StdErr = std::move(StdErr)}; } // Get Output directory node @@ -607,7 +609,9 @@ namespace detail { { return {.Error{.ErrorCode = ExitCode, .Reason = "Outputs directory not found in task results"}, .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds}; + .ElapsedSeconds = ElapsedSeconds, + .StdOut = std::move(StdOut), + .StdErr = std::move(StdErr)}; } // load build.output as CbObject @@ -682,7 +686,9 @@ namespace detail { { return {.Error{.ErrorCode = -1, .Reason = "Failed to get result object attachment data"}, .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds}; + .ElapsedSeconds = ElapsedSeconds, + .StdOut = std::move(StdOut), + .StdErr = std::move(StdErr)}; } OutputPackage.SetObject(BuildOutputObject); @@ -704,6 +710,8 @@ namespace detail { [[nodiscard]] bool ProcessApplyKey(const UpstreamApplyRecord& ApplyRecord, UpstreamData& Data) { + using namespace fmt::literals; + std::string ExecutablePath; std::map<std::string, std::string> Environment; std::set<std::filesystem::path> InputFiles; @@ -735,6 +743,15 @@ namespace detail { } } + for (auto& It : ApplyRecord.WorkerDescriptor["dirs"sv]) + { + std::string_view Directory = It.AsString(); + std::string DummyFile = "{}/.zen_empty_file"_format(Directory); + InputFiles.insert(DummyFile); + Data.Blobs[EmptyBufferId] = EmptyBuffer; + InputFileHashes[DummyFile] = EmptyBufferId; + } + for (auto& It : ApplyRecord.WorkerDescriptor["environment"sv]) { std::string_view Env = It.AsString(); @@ -809,18 +826,10 @@ namespace detail { bool Exclusive = ApplyRecord.WorkerDescriptor["exclusive"sv].AsBool(); // TODO: Remove override when Horde accepts the UE style Host Platforms (Win64, Linux, Mac) - std::string Condition; - if (HostPlatform == "Win64" || HostPlatform == "Windows") + std::string Condition = "Platform == '{}'"_format(HostPlatform); + if (HostPlatform == "Win64") { - Condition = "OSFamily == 'Windows' && Pool == 'Win-RemoteExec'"; - } - else if (HostPlatform == "Mac") - { - Condition = "OSFamily == 'MacOS'"; - } - else - { - Condition = "OSFamily == '{}'"_format(HostPlatform); + Condition += " && Pool == 'Win-RemoteExec'"; } std::map<std::string_view, int64_t> Resources; @@ -1199,6 +1208,8 @@ public: if (m_RunState.IsRunning) { + m_ShutdownEvent.Reset(); + for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++) { m_UpstreamThreads.emplace_back(&DefaultUpstreamApply::ProcessUpstreamQueue, this); @@ -1422,11 +1433,9 @@ private: void ProcessUpstreamUpdates() { - const auto& UpdateSleep = std::chrono::seconds(m_Options.UpdatesInterval); - for (;;) + const auto& UpdateSleep = std::chrono::milliseconds(m_Options.UpdatesInterval); + while (!m_ShutdownEvent.Wait(uint32_t(UpdateSleep.count()))) { - std::this_thread::sleep_for(UpdateSleep); - if (!m_RunState.IsRunning) { break; @@ -1489,6 +1498,8 @@ private: { if (m_RunState.Stop()) { + m_ShutdownEvent.Set(); + m_UpstreamQueue.CompleteAdding(); for (std::thread& Thread : m_UpstreamThreads) { @@ -1536,6 +1547,7 @@ private: UpstreamApplyTasks m_ApplyTasks; std::mutex m_ApplyTasksMutex; std::vector<std::unique_ptr<UpstreamApplyEndpoint>> m_Endpoints; + Event m_ShutdownEvent; std::vector<std::thread> m_UpstreamThreads; std::thread m_UpstreamUpdatesThread; std::thread m_EndpointMonitorThread; |