diff options
| author | Per Larsson <[email protected]> | 2021-11-13 14:35:42 +0100 |
|---|---|---|
| committer | Per Larsson <[email protected]> | 2021-11-13 14:35:42 +0100 |
| commit | c2eb50ecfa58a11ffbacb0127d5ddf10ab7e5663 (patch) | |
| tree | 90c95750cb6972329aaddc91e69444bd09570b6b | |
| parent | Cleanup attachment validation. (diff) | |
| parent | Remote Apply: Get Expire timeout from worker (diff) | |
| download | zen-c2eb50ecfa58a11ffbacb0127d5ddf10ab7e5663.tar.xz zen-c2eb50ecfa58a11ffbacb0127d5ddf10ab7e5663.zip | |
Merge branch 'main' into zcache-batch
| -rw-r--r-- | zenserver/upstream/jupiter.cpp | 215 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.h | 18 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamapply.cpp | 1492 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamapply.h | 172 | ||||
| -rw-r--r-- | zenserver/zenserver.vcxproj | 2 | ||||
| -rw-r--r-- | zenserver/zenserver.vcxproj.filters | 6 |
6 files changed, 1841 insertions, 64 deletions
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index 7a36b5841..4caa5c8df 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -182,9 +182,14 @@ CloudCacheSession::GetBlob(const IoHash& Key) { return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } const bool Success = Response.status_code == 200; - const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); + const IoBuffer Buffer = + Success && Response.text.size() > 0 ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; } @@ -214,6 +219,10 @@ CloudCacheSession::GetCompressedBlob(const IoHash& Key) { return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); @@ -246,6 +255,10 @@ CloudCacheSession::GetObject(const IoHash& Key) { return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); @@ -571,67 +584,41 @@ CloudCacheSession::RefExists(std::string_view BucketId, const IoHash& Key) CloudCacheResult CloudCacheSession::BlobExists(const IoHash& Key) { - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - if (!AccessToken.IsValid()) - { - return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; - } - - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); - - cpr::Session& Session = m_SessionState->Session; - - Session.SetOption(cpr::Url{Uri.c_str()}); - - cpr::Response Response = Session.Head(); - ZEN_DEBUG("HEAD {}", Response); - - if (Response.error) - { - return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; - } - else if (!VerifyAccessToken(Response.status_code)) - { - return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; - } - - return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; + return CacheTypeExists("blobs"sv, Key); } CloudCacheResult CloudCacheSession::CompressedBlobExists(const IoHash& Key) { - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - if (!AccessToken.IsValid()) - { - return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; - } - - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); - - cpr::Session& Session = m_SessionState->Session; + return CacheTypeExists("compressed-blobs"sv, Key); +} - Session.SetOption(cpr::Url{Uri.c_str()}); +CloudCacheResult +CloudCacheSession::ObjectExists(const IoHash& Key) +{ + return CacheTypeExists("objects"sv, Key); +} - cpr::Response Response = Session.Head(); - ZEN_DEBUG("HEAD {}", Response); +CloudCacheExistsResult +CloudCacheSession::BlobExists(const std::set<IoHash>& Keys) +{ + return CacheTypeExists("blobs"sv, Keys); +} - if (Response.error) - { - return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; - } - else if (!VerifyAccessToken(Response.status_code)) - { - return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; - } +CloudCacheExistsResult +CloudCacheSession::CompressedBlobExists(const std::set<IoHash>& Keys) +{ + return CacheTypeExists("compressed-blobs"sv, Keys); +} - return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; +CloudCacheExistsResult +CloudCacheSession::ObjectExists(const std::set<IoHash>& Keys) +{ + return CacheTypeExists("objects"sv, Keys); } CloudCacheResult -CloudCacheSession::ObjectExists(const IoHash& Key) +CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksData) { const CloudCacheAccessToken& AccessToken = GetAccessToken(); if (!AccessToken.IsValid()) @@ -640,14 +627,16 @@ CloudCacheSession::ObjectExists(const IoHash& Key) } ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId; - cpr::Session& Session = m_SessionState->Session; + auto& Session = m_SessionState->Session; Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}}); + Session.SetBody(cpr::Body{(const char*)TasksData.Data(), TasksData.Size()}); - cpr::Response Response = Session.Head(); - ZEN_DEBUG("HEAD {}", Response); + cpr::Response Response = Session.Post(); + ZEN_DEBUG("POST {}", Response); if (Response.error) { @@ -662,7 +651,7 @@ CloudCacheSession::ObjectExists(const IoHash& Key) } CloudCacheResult -CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksData) +CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds) { const CloudCacheAccessToken& AccessToken = GetAccessToken(); if (!AccessToken.IsValid()) @@ -671,13 +660,12 @@ CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksDa } ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId << "/updates?wait=" << WaitSeconds; auto& Session = m_SessionState->Session; Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}}); - Session.SetBody(cpr::Body{(const char*)TasksData.Data(), TasksData.Size()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}}); cpr::Response Response = Session.Post(); ZEN_DEBUG("POST {}", Response); @@ -698,7 +686,7 @@ CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksDa } CloudCacheResult -CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds) +CloudCacheSession::GetObjectTree(const IoHash& Key) { const CloudCacheAccessToken& AccessToken = GetAccessToken(); if (!AccessToken.IsValid()) @@ -707,15 +695,15 @@ CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t } ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId << "/updates?wait=" << WaitSeconds; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString() << "/tree"; - auto& Session = m_SessionState->Session; + cpr::Session& Session = m_SessionState->Session; Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/octet-stream"}}); - cpr::Response Response = Session.Post(); - ZEN_DEBUG("POST {}", Response); + cpr::Response Response = Session.Get(); + ZEN_DEBUG("GET {}", Response); if (Response.error) { @@ -761,6 +749,92 @@ CloudCacheSession::VerifyAccessToken(long StatusCode) return true; } +CloudCacheResult +CloudCacheSession::CacheTypeExists(std::string_view TypeId, const IoHash& Key) +{ + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + if (!AccessToken.IsValid()) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); + + cpr::Session& Session = m_SessionState->Session; + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}}); + + cpr::Response Response = Session.Head(); + ZEN_DEBUG("HEAD {}", Response); + + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } + + return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; +} + +CloudCacheExistsResult +CloudCacheSession::CacheTypeExists(std::string_view TypeId, const std::set<IoHash>& Keys) +{ + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + if (!AccessToken.IsValid()) + { + return {CloudCacheResult{.ErrorCode = 401, .Reason = std::string("Invalid access token")}}; + } + + ExtendableStringBuilder<256> Query; + for (const auto& Key : Keys) + { + Query << (Query.Size() != 0 ? "&id=" : "id=") << Key.ToHexString(); + } + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << m_CacheClient->BlobStoreNamespace() << "/exists?" << Query; + + cpr::Session& Session = m_SessionState->Session; + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}}); + + cpr::Response Response = Session.Post(); + ZEN_DEBUG("POST {}", Response); + + if (Response.error) + { + return {CloudCacheResult{.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}}; + } + else if (!VerifyAccessToken(Response.status_code)) + { + return {CloudCacheResult{.ErrorCode = 401, .Reason = std::string("Invalid access token")}}; + } + + CloudCacheExistsResult Result{ + CloudCacheResult{.Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}}; + + if (Result.Success) + { + IoBuffer Buffer = IoBuffer(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size()); + const CbObject ExistsResponse = LoadCompactBinaryObject(Buffer); + for (auto& Item : ExistsResponse["id"sv]) + { + if (Item.IsHash()) + { + Result.Have.insert(Item.AsHash()); + } + } + } + + return Result; +} + ////////////////////////////////////////////////////////////////////////// // // ServiceUrl: https://jupiter.devtools.epicgames.com @@ -778,7 +852,14 @@ CloudCacheClient::CloudCacheClient(const CloudCacheClientOptions& Options) , m_BlobStoreNamespace(Options.BlobStoreNamespace) , m_OAuthClientId(Options.OAuthClientId) , m_OAuthSecret(Options.OAuthSecret) +, m_AccessToken(Options.AccessToken) { + if (!Options.AccessToken.empty()) + { + // If an access token was provided, OAuth settings are not used. + return; + } + if (!Options.OAuthProvider.starts_with("http://"sv) && !Options.OAuthProvider.starts_with("https://"sv)) { ZEN_WARN("bad provider specification: '{}' - must be fully qualified", Options.OAuthProvider); @@ -828,6 +909,12 @@ CloudCacheClient::AcquireAccessToken() { using namespace std::chrono; + // If an access token was provided, return it instead of querying OAuth + if (!m_AccessToken.empty()) + { + return {m_AccessToken, steady_clock::time_point::max()}; + } + ExtendableStringBuilder<128> OAuthFormData; OAuthFormData << "client_id=" << m_OAuthClientId << "&scope=cache_access&grant_type=client_credentials&client_secret=" << m_OAuthSecret; diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h index 9471ef64f..13d65587e 100644 --- a/zenserver/upstream/jupiter.h +++ b/zenserver/upstream/jupiter.h @@ -12,6 +12,7 @@ #include <chrono> #include <list> #include <memory> +#include <set> #include <vector> struct ZenCacheValue; @@ -64,6 +65,11 @@ struct FinalizeRefResult : CloudCacheResult std::vector<IoHash> Needs; }; +struct CloudCacheExistsResult : CloudCacheResult +{ + std::set<IoHash> Have; +}; + /** * Context for performing Jupiter operations * @@ -95,12 +101,18 @@ public: FinalizeRefResult FinalizeRef(std::string_view BucketId, const IoHash& Key, const IoHash& RefHah); CloudCacheResult RefExists(std::string_view BucketId, const IoHash& Key); + CloudCacheResult BlobExists(const IoHash& Key); CloudCacheResult CompressedBlobExists(const IoHash& Key); CloudCacheResult ObjectExists(const IoHash& Key); + CloudCacheExistsResult BlobExists(const std::set<IoHash>& Keys); + CloudCacheExistsResult CompressedBlobExists(const std::set<IoHash>& Keys); + CloudCacheExistsResult ObjectExists(const std::set<IoHash>& Keys); + CloudCacheResult PostComputeTasks(std::string_view ChannelId, IoBuffer TasksData); CloudCacheResult GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds = 0); + CloudCacheResult GetObjectTree(const IoHash& Key); std::vector<IoHash> Filter(std::string_view BucketId, const std::vector<IoHash>& ChunkHashes); @@ -109,6 +121,10 @@ private: const CloudCacheAccessToken& GetAccessToken(); bool VerifyAccessToken(long StatusCode); + CloudCacheResult CacheTypeExists(std::string_view TypeId, const IoHash& Key); + + CloudCacheExistsResult CacheTypeExists(std::string_view TypeId, const std::set<IoHash>& Keys); + spdlog::logger& m_Log; RefPtr<CloudCacheClient> m_CacheClient; detail::CloudCacheSessionState* m_SessionState; @@ -122,6 +138,7 @@ struct CloudCacheClientOptions std::string_view OAuthProvider; std::string_view OAuthClientId; std::string_view OAuthSecret; + std::string_view AccessToken; bool UseLegacyDdc = false; }; @@ -152,6 +169,7 @@ private: std::string m_BlobStoreNamespace; std::string m_OAuthClientId; std::string m_OAuthSecret; + std::string m_AccessToken; bool m_IsValid = false; RwLock m_SessionStateLock; diff --git a/zenserver/upstream/upstreamapply.cpp b/zenserver/upstream/upstreamapply.cpp new file mode 100644 index 000000000..3f1b0d8f9 --- /dev/null +++ b/zenserver/upstream/upstreamapply.cpp @@ -0,0 +1,1492 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "upstreamapply.h" +#include "jupiter.h" +#include "zen.h" + +#include <zencore/blockingqueue.h> +#include <zencore/compactbinary.h> +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinarypackage.h> +#include <zencore/compactbinaryvalidation.h> +#include <zencore/compress.h> +#include <zencore/fmtutils.h> +#include <zencore/session.h> +#include <zencore/stats.h> +#include <zencore/stream.h> +#include <zencore/timer.h> + +#include <zenstore/cas.h> +#include <zenstore/cidstore.h> + +#include "cache/structuredcachestore.h" +#include "diag/logging.h" + +#include <fmt/format.h> + +#include <algorithm> +#include <atomic> +#include <map> +#include <set> +#include <stack> +#include <thread> +#include <unordered_map> + +namespace zen { + +using namespace std::literals; + +namespace detail { + + class HordeUpstreamApplyEndpoint final : public UpstreamApplyEndpoint + { + public: + HordeUpstreamApplyEndpoint(const CloudCacheClientOptions& Options, CasStore& CasStore, CidStore& CidStore) + : m_Log(logging::Get("upstream-apply")) + , m_CasStore(CasStore) + , m_CidStore(CidStore) + { + using namespace fmt::literals; + m_DisplayName = "Horde - '{}'"_format(Options.ServiceUrl); + m_Client = new CloudCacheClient(Options); + m_ChannelId = "zen-{}"_format(zen::GetSessionIdString()); + } + + virtual ~HordeUpstreamApplyEndpoint() = default; + + virtual UpstreamEndpointHealth Initialize() override { return CheckHealth(); } + + virtual bool IsHealthy() const override { return m_HealthOk.load(); } + + virtual UpstreamEndpointHealth CheckHealth() override + { + try + { + CloudCacheSession Session(m_Client); + CloudCacheResult Result = Session.Authenticate(); + + m_HealthOk = Result.ErrorCode == 0; + + return {.Reason = std::move(Result.Reason), .Ok = Result.Success}; + } + catch (std::exception& Err) + { + return {.Reason = Err.what(), .Ok = false}; + } + } + + virtual std::string_view DisplayName() const override { return m_DisplayName; } + + virtual PostUpstreamApplyResult PostApply(const UpstreamApplyRecord& ApplyRecord) override + { + int64_t Bytes{}; + double ElapsedSeconds{}; + + try + { + UpstreamData UpstreamData; + if (!ProcessApplyKey(ApplyRecord, UpstreamData)) + { + return {.Error{.ErrorCode = -1, .Reason = "Failed to generate task data"}}; + } + + { + std::scoped_lock Lock(m_TaskMutex); + if (m_PendingTasks.contains(UpstreamData.TaskId)) + { + // Pending task is already queued, return success + return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; + } + m_PendingTasks[UpstreamData.TaskId] = ApplyRecord; + } + + CloudCacheSession Session(m_Client); + + { + CloudCacheResult Result = BatchPutBlobsIfMissing(Session, UpstreamData.Blobs); + Bytes += Result.Bytes; + ElapsedSeconds += Result.ElapsedSeconds; + if (!Result.Success) + { + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + UpstreamData.Blobs.clear(); + } + + { + CloudCacheResult Result = BatchPutObjectsIfMissing(Session, UpstreamData.Objects); + Bytes += Result.Bytes; + ElapsedSeconds += Result.ElapsedSeconds; + if (!Result.Success) + { + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + UpstreamData.Objects.clear(); + } + + CbObjectWriter Writer; + Writer.AddObjectAttachment("r"sv, UpstreamData.RequirementsId); + Writer.BeginArray("t"sv); + Writer.AddObjectAttachment(UpstreamData.TaskId); + Writer.EndArray(); + IoBuffer TasksData = Writer.Save().GetBuffer().AsIoBuffer(); + + CloudCacheResult Result = Session.PostComputeTasks(m_ChannelId, TasksData); + Bytes += Result.Bytes; + ElapsedSeconds += Result.ElapsedSeconds; + if (!Result.Success) + { + { + std::scoped_lock Lock(m_TaskMutex); + m_PendingTasks.erase(UpstreamData.TaskId); + } + + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + + return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; + } + catch (std::exception& Err) + { + m_HealthOk = false; + return {.Error{.ErrorCode = -1, .Reason = Err.what()}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds}; + } + } + + [[nodiscard]] CloudCacheResult BatchPutBlobsIfMissing(CloudCacheSession& Session, const std::map<IoHash, IoBuffer>& Blobs) + { + if (Blobs.size() == 0) + { + return {.Success = true}; + } + + int64_t Bytes{}; + double ElapsedSeconds{}; + + // Batch check for missing blobs + std::set<IoHash> Keys; + for (const auto& It : Blobs) + { + Keys.insert(It.first); + } + + CloudCacheExistsResult ExistsResult = Session.BlobExists(Keys); + ElapsedSeconds += ExistsResult.ElapsedSeconds; + if (ExistsResult.ErrorCode != 0) + { + return {.Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .ErrorCode = ExistsResult.ErrorCode, + .Reason = std::move(ExistsResult.Reason)}; + } + + // TODO: Batch upload missing blobs + + for (const auto& It : Blobs) + { + if (ExistsResult.Have.contains(It.first)) + { + continue; + } + + CloudCacheResult Result = Session.PutBlob(It.first, It.second); + Bytes += Result.Bytes; + ElapsedSeconds += Result.ElapsedSeconds; + if (!Result.Success) + { + return {.Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .ErrorCode = Result.ErrorCode, + .Reason = std::move(Result.Reason)}; + } + } + + return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; + } + + [[nodiscard]] CloudCacheResult BatchPutObjectsIfMissing(CloudCacheSession& Session, const std::map<IoHash, CbObject>& Objects) + { + if (Objects.size() == 0) + { + return {.Success = true}; + } + + int64_t Bytes{}; + double ElapsedSeconds{}; + + // Batch check for missing objects + std::set<IoHash> Keys; + for (const auto& It : Objects) + { + Keys.insert(It.first); + } + + CloudCacheExistsResult ExistsResult = Session.ObjectExists(Keys); + ElapsedSeconds += ExistsResult.ElapsedSeconds; + if (ExistsResult.ErrorCode != 0) + { + return {.Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .ErrorCode = ExistsResult.ErrorCode, + .Reason = std::move(ExistsResult.Reason)}; + } + + // TODO: Batch upload missing objects + + for (const auto& It : Objects) + { + if (ExistsResult.Have.contains(It.first)) + { + continue; + } + + CloudCacheResult Result = Session.PutObject(It.first, It.second.GetBuffer().AsIoBuffer()); + Bytes += Result.Bytes; + ElapsedSeconds += Result.ElapsedSeconds; + if (!Result.Success) + { + return {.Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .ErrorCode = Result.ErrorCode, + .Reason = std::move(Result.Reason)}; + } + } + + return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; + } + + enum class ComputeTaskState : int32_t + { + Queued = 0, + Executing = 1, + Complete = 2, + }; + + enum class ComputeTaskOutcome : int32_t + { + Success = 0, + Failed = 1, + Cancelled = 2, + NoResult = 3, + Exipred = 4, + BlobNotFound = 5, + Exception = 6, + }; + + virtual GetUpstreamApplyUpdatesResult GetUpdates() override + { + int64_t Bytes{}; + double ElapsedSeconds{}; + UpstreamApplyCompleted CompletedTasks; + + { + std::scoped_lock Lock(m_TaskMutex); + if (m_PendingTasks.empty()) + { + // Nothing to do. + return {.Success = true}; + } + } + + try + { + CloudCacheSession Session(m_Client); + + CloudCacheResult UpdatesResult = Session.GetComputeUpdates(m_ChannelId); + Bytes += UpdatesResult.Bytes; + ElapsedSeconds += UpdatesResult.ElapsedSeconds; + if (UpdatesResult.ErrorCode != 0) + { + return {.Error{.ErrorCode = UpdatesResult.ErrorCode, .Reason = std::move(UpdatesResult.Reason)}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + + if (!UpdatesResult.Success) + { + return {.Error{.ErrorCode = -1, .Reason = "Failed get task updates"}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds}; + } + + CbObject TaskStatus = LoadCompactBinaryObject(UpdatesResult.Response); + + // zen::StringBuilder<4096> ObjStr; + // zen::CompactBinaryToJson(TaskStatus, ObjStr); + + for (auto& It : TaskStatus["u"sv]) + { + CbObjectView Status = It.AsObjectView(); + const ComputeTaskState State = (ComputeTaskState)Status["s"sv].AsInt32(); + + const std::string_view AgentId = TaskStatus["a"sv].AsString(); + const std::string_view LeaseId = TaskStatus["l"sv].AsString(); + + // Only care about completed tasks + if (State != ComputeTaskState::Complete) + { + continue; + } + + const IoHash TaskId = Status["h"sv].AsObjectAttachment(); + + IoHash WorkerId; + IoHash ActionId; + + { + std::scoped_lock Lock(m_TaskMutex); + auto TaskIt = m_PendingTasks.find(TaskId); + if (TaskIt == m_PendingTasks.end()) + { + continue; + } + WorkerId = TaskIt->second.WorkerDescriptor.GetHash(); + ActionId = TaskIt->second.Action.GetHash(); + m_PendingTasks.erase(TaskIt); + } + + GetUpstreamApplyResult Result = ProcessTaskStatus(Status, Session); + Bytes += Result.Bytes; + ElapsedSeconds += Result.ElapsedSeconds; + + CompletedTasks[WorkerId][ActionId] = std::move(Result); + } + + return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Completed = std::move(CompletedTasks), .Success = true}; + } + catch (std::exception& Err) + { + m_HealthOk = false; + return { + .Error{.ErrorCode = -1, .Reason = Err.what()}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .Completed = std::move(CompletedTasks), + }; + } + } + + virtual UpstreamApplyEndpointStats& Stats() override { return m_Stats; } + + private: + spdlog::logger& Log() { return m_Log; } + + CasStore& m_CasStore; + CidStore& m_CidStore; + spdlog::logger& m_Log; + std::string m_DisplayName; + RefPtr<CloudCacheClient> m_Client; + UpstreamApplyEndpointStats m_Stats; + std::atomic_bool m_HealthOk{false}; + std::string m_ChannelId; + + std::mutex m_TaskMutex; + std::unordered_map<IoHash, UpstreamApplyRecord> m_PendingTasks; + + struct UpstreamData + { + std::map<IoHash, IoBuffer> Blobs; + std::map<IoHash, CbObject> Objects; + IoHash TaskId; + IoHash RequirementsId; + }; + + struct UpstreamDirectory + { + std::filesystem::path Path; + std::map<std::string, UpstreamDirectory> Directories; + std::set<std::string> Files; + }; + + [[nodiscard]] GetUpstreamApplyResult ProcessTaskStatus(const CbObjectView& TaskStatus, CloudCacheSession& Session) + { + try + { + const ComputeTaskOutcome Outcome = (ComputeTaskOutcome)TaskStatus["o"sv].AsInt32(); + + if (Outcome != ComputeTaskOutcome::Success) + { + const std::string_view Detail = TaskStatus["d"sv].AsString(); + return {.Error{.ErrorCode = -1, .Reason = std::string(Detail)}}; + } + + const IoHash TaskId = TaskStatus["h"sv].AsObjectAttachment(); + const DateTime Time = TaskStatus["t"sv].AsDateTime(); + const IoHash ResultHash = TaskStatus["r"sv].AsObjectAttachment(); + const std::string_view AgentId = TaskStatus["a"sv].AsString(); + const std::string_view LeaseId = TaskStatus["l"sv].AsString(); + + int64_t Bytes{}; + double ElapsedSeconds{}; + + // Get Result object and all Object Attachments + Binary Attachment IDs + CloudCacheResult ObjectTreeResult = Session.GetObjectTree(ResultHash); + Bytes += ObjectTreeResult.Bytes; + ElapsedSeconds += ObjectTreeResult.ElapsedSeconds; + + if (ObjectTreeResult.ErrorCode != 0) + { + return {.Error{.ErrorCode = ObjectTreeResult.ErrorCode, .Reason = std::move(ObjectTreeResult.Reason)}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + + if (!ObjectTreeResult.Success) + { + return {.Error{.ErrorCode = -1, .Reason = "Failed to get result object data"}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + + std::map<IoHash, IoBuffer> TreeObjectData; + std::map<IoHash, IoBuffer> TreeBinaryData; + + MemoryView ResponseView = ObjectTreeResult.Response; + while (ResponseView.GetSize() > 0) + { + CbFieldView Field = CbFieldView(ResponseView.GetData()); + ResponseView += Field.GetSize(); + if (Field.IsObjectAttachment()) + { + const IoHash Hash = Field.AsObjectAttachment(); + Field = CbFieldView(ResponseView.GetData()); + ResponseView += Field.GetSize(); + if (!Field.IsObject()) // No data + { + TreeObjectData[Hash] = {}; + continue; + } + MemoryView FieldView = Field.AsObjectView().GetView(); + + TreeObjectData[Hash] = IoBuffer(IoBuffer::Wrap, FieldView.GetData(), FieldView.GetSize()); + } + else if (Field.IsBinaryAttachment()) + { + const IoHash Hash = Field.AsBinaryAttachment(); + TreeBinaryData[Hash] = {}; + } + else // Unknown type + { + } + } + + for (auto& It : TreeObjectData) + { + if (It.second.GetSize() == 0) + { + CloudCacheResult ObjectResult = Session.GetObject(It.first); + Bytes += ObjectTreeResult.Bytes; + ElapsedSeconds += ObjectTreeResult.ElapsedSeconds; + if (ObjectTreeResult.ErrorCode != 0) + { + return {.Error{.ErrorCode = ObjectResult.ErrorCode, .Reason = std::move(ObjectResult.Reason)}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + + if (!ObjectResult.Success) + { + return {.Error{.ErrorCode = -1, .Reason = "Failed to get result object attachment data"}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + It.second = std::move(ObjectResult.Response); + } + } + + for (auto& It : TreeBinaryData) + { + if (It.second.GetSize() == 0) + { + CloudCacheResult BlobResult = Session.GetBlob(It.first); + Bytes += ObjectTreeResult.Bytes; + ElapsedSeconds += ObjectTreeResult.ElapsedSeconds; + if (BlobResult.ErrorCode != 0) + { + return {.Error{.ErrorCode = BlobResult.ErrorCode, .Reason = std::move(BlobResult.Reason)}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + + if (!BlobResult.Success) + { + return {.Error{.ErrorCode = -1, .Reason = "Failed to get result binary attachment data"}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + It.second = std::move(BlobResult.Response); + } + } + + CbObject ResultObject = LoadCompactBinaryObject(TreeObjectData[ResultHash]); + int32_t ExitCode = ResultObject["e"sv].AsInt32(); + IoHash StdOutHash = ResultObject["so"sv].AsBinaryAttachment(); + IoHash StdErrHash = ResultObject["se"sv].AsBinaryAttachment(); + IoHash OutputHash = ResultObject["o"sv].AsObjectAttachment(); + + std::string StdOut = std::string((const char*)TreeBinaryData[StdOutHash].GetData(), TreeBinaryData[StdOutHash].GetSize()); + std::string StdErr = std::string((const char*)TreeBinaryData[StdErrHash].GetData(), TreeBinaryData[StdErrHash].GetSize()); + + if (ExitCode != 0) + { + return {.Error{.ErrorCode = ExitCode, .Reason = "Task completed with errors"}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .StdOut = std::move(StdOut), + .StdErr = std::move(StdErr)}; + } + + CbObject OutputObject = LoadCompactBinaryObject(TreeObjectData[OutputHash]); + + // Get build.output + IoHash BuildOutputId; + IoBuffer BuildOutput; + for (auto& It : OutputObject["f"sv]) + { + const CbObjectView FileObject = It.AsObjectView(); + if (FileObject["n"sv].AsString() == "Build.output"sv) + { + BuildOutputId = FileObject["h"sv].AsBinaryAttachment(); + BuildOutput = TreeBinaryData[BuildOutputId]; + break; + } + } + + if (BuildOutput.GetSize() == 0) + { + return {.Error{.ErrorCode = ExitCode, .Reason = "Build.output file not found in task results"}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + + // Get Output directory node + IoBuffer OutputDirectoryTree; + for (auto& It : OutputObject["d"sv]) + { + const CbObjectView DirectoryObject = It.AsObjectView(); + if (DirectoryObject["n"sv].AsString() == "Outputs"sv) + { + OutputDirectoryTree = TreeObjectData[DirectoryObject["h"sv].AsObjectAttachment()]; + break; + } + } + + if (OutputDirectoryTree.GetSize() == 0) + { + return {.Error{.ErrorCode = ExitCode, .Reason = "Outputs directory not found in task results"}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + + // load build.output as CbObject + + // Move Outputs from Horde to CbPackage + + std::unordered_map<IoHash, IoHash> CidToCompressedId; + CbPackage OutputPackage; + CbObject OutputDirectoryTreeObject = LoadCompactBinaryObject(OutputDirectoryTree); + int64_t TotalAttachmentBytes = 0; + int64_t TotalRawAttachmentBytes = 0; + + for (auto& It : OutputDirectoryTreeObject["f"sv]) + { + CbObjectView FileObject = It.AsObjectView(); + // Name is the uncompressed hash + IoHash DecompressedId = IoHash::FromHexString(FileObject["n"sv].AsString()); + // Hash is the compressed data hash, and how it is stored in Horde + IoHash CompressedId = FileObject["h"sv].AsBinaryAttachment(); + + if (!TreeBinaryData.contains(CompressedId)) + { + Log().warn("Object attachment chunk not retrieved from Horde {}", CompressedId.ToHexString()); + return {.Error{.ErrorCode = -1, .Reason = "Object attachment chunk not retrieved from Horde"}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + CidToCompressedId[DecompressedId] = CompressedId; + } + + // Iterate attachments, verify all chunks exist, and add to CbPackage + bool AnyErrors = false; + CbObject BuildOutputObject = LoadCompactBinaryObject(BuildOutput); + BuildOutputObject.IterateAttachments([&](CbFieldView Field) { + const IoHash DecompressedId = Field.AsHash(); + if (!CidToCompressedId.contains(DecompressedId)) + { + Log().warn("Attachment not found {}", DecompressedId.ToHexString()); + AnyErrors = true; + return; + } + const IoHash& CompressedId = CidToCompressedId.at(DecompressedId); + + if (!TreeBinaryData.contains(CompressedId)) + { + Log().warn("Missing output {} compressed {} uncompressed", + CompressedId.ToHexString(), + DecompressedId.ToHexString()); + AnyErrors = true; + return; + } + + CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(TreeBinaryData[CompressedId])); + + if (!AttachmentBuffer) + { + Log().warn("Invalid output encountered (not valid CompressedBuffer format) {} compressed {} uncompressed", + CompressedId.ToHexString(), + DecompressedId.ToHexString()); + AnyErrors = true; + return; + } + + TotalAttachmentBytes += AttachmentBuffer.GetCompressedSize(); + TotalRawAttachmentBytes += AttachmentBuffer.GetRawSize(); + + CbAttachment Attachment(AttachmentBuffer); + OutputPackage.AddAttachment(Attachment); + }); + + if (AnyErrors) + { + return {.Error{.ErrorCode = -1, .Reason = "Failed to get result object attachment data"}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + + OutputPackage.SetObject(BuildOutputObject); + + return {.OutputPackage = std::move(OutputPackage), + .TotalAttachmentBytes = TotalAttachmentBytes, + .TotalRawAttachmentBytes = TotalRawAttachmentBytes, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .StdOut = std::move(StdOut), + .StdErr = std::move(StdErr), + .Success = true}; + } + catch (std::exception& Err) + { + return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; + } + } + + [[nodiscard]] bool ProcessApplyKey(const UpstreamApplyRecord& ApplyRecord, UpstreamData& Data) + { + std::string ExecutablePath; + std::map<std::string, std::string> Environment; + std::set<std::filesystem::path> InputFiles; + std::map<std::filesystem::path, IoHash> InputFileHashes; + + ExecutablePath = ApplyRecord.WorkerDescriptor["path"sv].AsString(); + if (ExecutablePath.empty()) + { + Log().warn("process apply upstream FAILED, '{}', path missing from worker descriptor", + ApplyRecord.WorkerDescriptor.GetHash()); + return false; + } + + for (auto& It : ApplyRecord.WorkerDescriptor["executables"sv]) + { + CbObjectView FileEntry = It.AsObjectView(); + if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.Blobs)) + { + return false; + } + } + + for (auto& It : ApplyRecord.WorkerDescriptor["files"sv]) + { + CbObjectView FileEntry = It.AsObjectView(); + if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.Blobs)) + { + return false; + } + } + + for (auto& It : ApplyRecord.WorkerDescriptor["environment"sv]) + { + std::string_view Env = It.AsString(); + auto Index = Env.find('='); + if (Index < 0) + { + Log().warn("process apply upstream FAILED, environment '{}' malformed", Env); + return false; + } + + Environment[std::string(Env.substr(0, Index))] = Env.substr(Index + 1); + } + + { + static const std::filesystem::path BuildActionPath = "Build.action"sv; + static const std::filesystem::path InputPath = "Inputs"sv; + const IoHash ActionId = ApplyRecord.Action.GetHash(); + + InputFiles.insert(BuildActionPath); + InputFileHashes[BuildActionPath] = ActionId; + Data.Blobs[ActionId] = IoBufferBuilder::MakeCloneFromMemory(ApplyRecord.Action.GetBuffer().GetData(), + ApplyRecord.Action.GetBuffer().GetSize()); + + bool AnyErrors = false; + ApplyRecord.Action.IterateAttachments([&](CbFieldView Field) { + const IoHash Cid = Field.AsHash(); + const std::filesystem::path FilePath = {InputPath / Cid.ToHexString()}; + IoBuffer DataBuffer = m_CidStore.FindChunkByCid(Cid); + + if (!DataBuffer) + { + Log().warn("process apply upstream FAILED, input CID chunk '{}' missing", Cid); + AnyErrors = true; + return; + } + + if (InputFiles.contains(FilePath)) + { + return; + } + + const IoHash CompressedId = IoHash::HashBuffer(DataBuffer.GetData(), DataBuffer.GetSize()); + + InputFiles.insert(FilePath); + InputFileHashes[FilePath] = CompressedId; + Data.Blobs[CompressedId] = std::move(DataBuffer); + }); + + if (AnyErrors) + { + return false; + } + } + + const UpstreamDirectory RootDirectory = BuildDirectoryTree(InputFiles); + + CbObject Sandbox = BuildMerkleTreeDirectory(RootDirectory, InputFileHashes, Data.Blobs, Data.Objects); + const IoHash SandboxHash = Sandbox.GetHash(); + Data.Objects[SandboxHash] = std::move(Sandbox); + + CbObject Requirements = BuildRequirements("OSFamily == 'Windows'"sv, {}, false); + const IoHash RequirementsId = Requirements.GetHash(); + Data.Objects[RequirementsId] = std::move(Requirements); + Data.RequirementsId = RequirementsId; + + CbObject Task = BuildTask(ExecutablePath, + {"-Build=build.action"}, + Environment, + {}, + SandboxHash, + RequirementsId, + {"Build.output", "Outputs"}); + + const IoHash TaskId = Task.GetHash(); + Data.Objects[TaskId] = std::move(Task); + Data.TaskId = TaskId; + + return true; + } + + [[nodiscard]] bool ProcessFileEntry(const CbObjectView& FileEntry, + std::set<std::filesystem::path>& InputFiles, + std::map<std::filesystem::path, IoHash>& InputFileHashes, + std::map<IoHash, IoBuffer>& Blobs) + { + const std::filesystem::path FilePath = FileEntry["name"sv].AsString(); + const IoHash ChunkId = FileEntry["hash"sv].AsHash(); + const uint64_t Size = FileEntry["size"sv].AsUInt64(); + IoBuffer DataBuffer = m_CasStore.FindChunk(ChunkId); + + if (!DataBuffer) + { + Log().warn("process apply upstream FAILED, worker CAS chunk '{}' missing", ChunkId); + return false; + } + + if (DataBuffer.Size() != Size) + { + Log().warn("process apply upstream FAILED, worker CAS chunk '{}' size: {}, action spec expected {}", + ChunkId, + DataBuffer.Size(), + Size); + return false; + } + + if (InputFiles.contains(FilePath)) + { + Log().warn("process apply upstream FAILED, worker CAS chunk '{}' size: {} duplicate filename {}", ChunkId, Size, FilePath); + return false; + } + + InputFiles.insert(FilePath); + InputFileHashes[FilePath] = ChunkId; + Blobs[ChunkId] = std::move(DataBuffer); + return true; + } + + [[nodiscard]] UpstreamDirectory BuildDirectoryTree(const std::set<std::filesystem::path>& InputFiles) + { + static const std::filesystem::path RootPath; + std::map<std::filesystem::path, UpstreamDirectory*> AllDirectories; + UpstreamDirectory RootDirectory = {.Path = RootPath}; + + AllDirectories[RootPath] = &RootDirectory; + + // Build tree from flat list + for (const auto& Path : InputFiles) + { + if (Path.has_parent_path()) + { + if (!AllDirectories.contains(Path.parent_path())) + { + std::stack<std::string> PathSplit; + { + std::filesystem::path ParentPath = Path.parent_path(); + PathSplit.push(ParentPath.filename().string()); + while (ParentPath.has_parent_path()) + { + ParentPath = ParentPath.parent_path(); + PathSplit.push(ParentPath.filename().string()); + } + } + UpstreamDirectory* ParentPtr = &RootDirectory; + while (!PathSplit.empty()) + { + if (!ParentPtr->Directories.contains(PathSplit.top())) + { + std::filesystem::path NewParentPath = {ParentPtr->Path / PathSplit.top()}; + ParentPtr->Directories[PathSplit.top()] = {.Path = NewParentPath}; + AllDirectories[NewParentPath] = &ParentPtr->Directories[PathSplit.top()]; + } + ParentPtr = &ParentPtr->Directories[PathSplit.top()]; + PathSplit.pop(); + } + } + + AllDirectories[Path.parent_path()]->Files.insert(Path.filename().string()); + } + else + { + RootDirectory.Files.insert(Path.filename().string()); + } + } + + return RootDirectory; + } + + [[nodiscard]] CbObject BuildMerkleTreeDirectory(const UpstreamDirectory& RootDirectory, + const std::map<std::filesystem::path, IoHash>& InputFileHashes, + const std::map<IoHash, IoBuffer>& Blobs, + std::map<IoHash, CbObject>& Objects) + { + CbObjectWriter DirectoryTreeWriter; + + if (!RootDirectory.Files.empty()) + { + DirectoryTreeWriter.BeginArray("f"sv); + for (const auto& File : RootDirectory.Files) + { + const std::filesystem::path FilePath = {RootDirectory.Path / File}; + const IoHash& FileHash = InputFileHashes.at(FilePath); + const uint64_t FileSize = Blobs.at(FileHash).Size(); + DirectoryTreeWriter.BeginObject(); + DirectoryTreeWriter.AddString("n"sv, File); + DirectoryTreeWriter.AddBinaryAttachment("h"sv, FileHash); + DirectoryTreeWriter.AddInteger("s"sv, FileSize); // Size + // DirectoryTreeWriter.AddInteger("a"sv, 0); // Attributes Currently unneeded + DirectoryTreeWriter.EndObject(); + } + DirectoryTreeWriter.EndArray(); + } + + if (!RootDirectory.Directories.empty()) + { + DirectoryTreeWriter.BeginArray("d"sv); + for (const auto& Item : RootDirectory.Directories) + { + CbObject Directory = BuildMerkleTreeDirectory(Item.second, InputFileHashes, Blobs, Objects); + const IoHash DirectoryHash = Directory.GetHash(); + Objects[DirectoryHash] = std::move(Directory); + + DirectoryTreeWriter.BeginObject(); + DirectoryTreeWriter.AddString("n"sv, Item.first); + DirectoryTreeWriter.AddObjectAttachment("h"sv, DirectoryHash); + DirectoryTreeWriter.EndObject(); + } + DirectoryTreeWriter.EndArray(); + } + + return std::move(DirectoryTreeWriter.Save()); + } + + [[nodiscard]] CbObject BuildRequirements(const std::string_view Condition, + const std::map<std::string_view, int32_t>& Resources, + const bool Exclusive) + { + CbObjectWriter Writer; + Writer.AddString("c", Condition); + if (!Resources.empty()) + { + Writer.BeginArray("r"); + for (const auto& Resource : Resources) + { + Writer.BeginArray(); + Writer.AddString(Resource.first); + Writer.AddInteger(Resource.second); + Writer.EndArray(); + } + Writer.EndArray(); + } + Writer.AddBool("e", Exclusive); + return std::move(Writer.Save()); + } + + [[nodiscard]] CbObject BuildTask(const std::string_view Executable, + const std::vector<std::string>& Arguments, + const std::map<std::string, std::string>& Environment, + const std::string_view WorkingDirectory, + const IoHash& SandboxHash, + const IoHash& RequirementsId, + const std::set<std::string>& Outputs) + { + CbObjectWriter TaskWriter; + TaskWriter.AddString("e"sv, Executable); + + if (!Arguments.empty()) + { + TaskWriter.BeginArray("a"sv); + for (const auto& Argument : Arguments) + { + TaskWriter.AddString(Argument); + } + TaskWriter.EndArray(); + } + + if (!Environment.empty()) + { + TaskWriter.BeginArray("v"sv); + for (const auto& Env : Environment) + { + TaskWriter.BeginArray(); + TaskWriter.AddString(Env.first); + TaskWriter.AddString(Env.second); + TaskWriter.EndArray(); + } + TaskWriter.EndArray(); + } + + if (!WorkingDirectory.empty()) + { + TaskWriter.AddString("s"sv, WorkingDirectory); + } + + TaskWriter.AddObjectAttachment("s"sv, SandboxHash); + TaskWriter.AddObjectAttachment("r"sv, RequirementsId); + + // Outputs + if (!Outputs.empty()) + { + TaskWriter.BeginArray("o"sv); + for (const auto& Output : Outputs) + { + TaskWriter.AddString(Output); + } + TaskWriter.EndArray(); + } + + return std::move(TaskWriter.Save()); + } + }; +} // namespace detail + +////////////////////////////////////////////////////////////////////////// + +struct UpstreamApplyStats +{ + static constexpr uint64_t MaxSampleCount = 1000ull; + + UpstreamApplyStats(bool Enabled) : m_Enabled(Enabled) {} + + void Add(spdlog::logger& Logger, + UpstreamApplyEndpoint& Endpoint, + const PostUpstreamApplyResult& Result, + const std::vector<std::unique_ptr<UpstreamApplyEndpoint>>& Endpoints) + { + UpstreamApplyEndpointStats& Stats = Endpoint.Stats(); + + if (Result.Error) + { + Stats.ErrorCount++; + } + else if (Result.Success) + { + Stats.PostCount++; + Stats.UpBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0); + Stats.SecondsUp.fetch_add(Result.ElapsedSeconds); + } + + if (m_Enabled && m_SampleCount++ % MaxSampleCount) + { + Dump(Logger, Endpoints); + } + } + + void Add(spdlog::logger& Logger, + UpstreamApplyEndpoint& Endpoint, + const GetUpstreamApplyUpdatesResult& Result, + const std::vector<std::unique_ptr<UpstreamApplyEndpoint>>& Endpoints) + { + UpstreamApplyEndpointStats& Stats = Endpoint.Stats(); + + if (Result.Error) + { + Stats.ErrorCount++; + } + else if (Result.Success) + { + Stats.UpdateCount++; + Stats.DownBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0); + Stats.SecondsDown.fetch_add(Result.ElapsedSeconds); + if (!Result.Completed.empty()) + { + uint64_t Completed = 0; + for (auto& It : Result.Completed) + { + Completed += It.second.size(); + } + Stats.CompleteCount.fetch_add(Completed); + } + } + + if (m_Enabled && m_SampleCount++ % MaxSampleCount) + { + Dump(Logger, Endpoints); + } + } + + void Dump(spdlog::logger& Logger, const std::vector<std::unique_ptr<UpstreamApplyEndpoint>>& Endpoints) + { + for (auto& Ep : Endpoints) + { + // These stats will not be totally correct as the numbers are not captured atomically + + UpstreamApplyEndpointStats& Stats = Ep->Stats(); + const uint64_t PostCount = Stats.PostCount; + const uint64_t CompleteCount = Stats.CompleteCount; + const uint64_t UpdateCount = Stats.UpdateCount; + const double DownBytes = Stats.DownBytes; + const double SecondsDown = Stats.SecondsDown; + const double UpBytes = Stats.UpBytes; + const double SecondsUp = Stats.SecondsUp; + + const double UpSpeed = UpBytes > 0 ? UpBytes / SecondsUp : 0.0; + const double DownSpeed = DownBytes > 0 ? DownBytes / SecondsDown : 0.0; + const double CompleteRate = CompleteCount > 0 ? (double(PostCount) / double(CompleteCount)) : 0.0; + + Logger.debug("STATS - '{}', Complete rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'", + Ep->DisplayName(), + CompleteRate, + DownBytes, + DownSpeed, + UpBytes, + UpSpeed); + } + } + + bool m_Enabled; + std::atomic_uint64_t m_SampleCount = {}; +}; + +////////////////////////////////////////////////////////////////////////// + +class DefaultUpstreamApply final : public UpstreamApply +{ +public: + DefaultUpstreamApply(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore) + : m_Log(logging::Get("upstream-apply")) + , m_Options(Options) + , m_CasStore(CasStore) + , m_CidStore(CidStore) + , m_Stats(Options.StatsEnabled) + { + } + + virtual ~DefaultUpstreamApply() { Shutdown(); } + + virtual bool Initialize() override + { + for (auto& Endpoint : m_Endpoints) + { + const UpstreamEndpointHealth Health = Endpoint->Initialize(); + if (Health.Ok) + { + Log().info("initialize endpoint '{}' OK", Endpoint->DisplayName()); + } + else + { + Log().warn("initialize endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason); + } + } + + m_RunState.IsRunning = !m_Endpoints.empty(); + + if (m_RunState.IsRunning) + { + for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++) + { + m_UpstreamThreads.emplace_back(&DefaultUpstreamApply::ProcessUpstreamQueue, this); + } + + m_UpstreamUpdatesThread = std::thread(&DefaultUpstreamApply::ProcessUpstreamUpdates, this); + + m_EndpointMonitorThread = std::thread(&DefaultUpstreamApply::MonitorEndpoints, this); + } + + return m_RunState.IsRunning; + } + + virtual void RegisterEndpoint(std::unique_ptr<UpstreamApplyEndpoint> Endpoint) override + { + m_Endpoints.emplace_back(std::move(Endpoint)); + } + + virtual EnqueueResult EnqueueUpstream(UpstreamApplyRecord ApplyRecord) override + { + if (m_RunState.IsRunning) + { + const IoHash WorkerId = ApplyRecord.WorkerDescriptor.GetHash(); + const IoHash ActionId = ApplyRecord.Action.GetHash(); + const uint32_t TimeoutSeconds = ApplyRecord.WorkerDescriptor["timeout"sv].AsInt32(300); + + { + std::scoped_lock Lock(m_ApplyTasksMutex); + if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) + { + // Already in progress + return {.ApplyId = ActionId, .Success = true}; + } + + std::chrono::steady_clock::time_point ExpireTime = + TimeoutSeconds > 0 ? std::chrono::steady_clock::now() + std::chrono::seconds(TimeoutSeconds) + : std::chrono::steady_clock::time_point::max(); + + m_ApplyTasks[WorkerId][ActionId] = {.State = UpstreamApplyState::Queued, .Result{}, .ExpireTime = std::move(ExpireTime)}; + } + + if (!m_UpstreamThreads.empty()) + { + m_UpstreamQueue.Enqueue(std::move(ApplyRecord)); + } + else + { + ProcessApplyRecord(std::move(ApplyRecord)); + } + + return {.ApplyId = ActionId, .Success = true}; + } + + return {}; + } + + virtual StatusResult GetStatus(const IoHash& WorkerId, const IoHash& ActionId) override + { + if (m_RunState.IsRunning) + { + std::scoped_lock Lock(m_ApplyTasksMutex); + if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) + { + return {.Status = *Status, .Success = true}; + } + } + + return {}; + } + + virtual void GetStatus(CbObjectWriter& Status) override + { + Status << "worker_threads" << m_Options.ThreadCount; + Status << "queue_count" << m_UpstreamQueue.Size(); + + Status.BeginArray("endpoints"); + for (const auto& Ep : m_Endpoints) + { + Status.BeginObject(); + Status << "name" << Ep->DisplayName(); + Status << "health" << (Ep->IsHealthy() ? "ok"sv : "inactive"sv); + + UpstreamApplyEndpointStats& Stats = Ep->Stats(); + const uint64_t PostCount = Stats.PostCount; + const uint64_t CompleteCount = Stats.CompleteCount; + const uint64_t UpdateCount = Stats.UpdateCount; + const double CompleteRate = CompleteCount > 0 ? (double(PostCount) / double(CompleteCount)) : 0.0; + + Status << "post_count" << PostCount; + Status << "complete_count" << PostCount; + Status << "update_count" << Stats.UpdateCount; + + Status << "complete_ratio" << CompleteRate; + Status << "downloaded_mb" << Stats.DownBytes; + Status << "uploaded_mb" << Stats.UpBytes; + Status << "error_count" << Stats.ErrorCount; + + Status.EndObject(); + } + Status.EndArray(); + } + +private: + // The caller is responsible for locking if required + UpstreamApplyStatus* FindStatus(const IoHash& WorkerId, const IoHash& ActionId) + { + if (auto It = m_ApplyTasks.find(WorkerId); It != m_ApplyTasks.end()) + { + if (auto It2 = It->second.find(ActionId); It2 != It->second.end()) + { + return &It2->second; + } + } + return nullptr; + } + + void ProcessApplyRecord(UpstreamApplyRecord ApplyRecord) + { + const IoHash WorkerId = ApplyRecord.WorkerDescriptor.GetHash(); + const IoHash ActionId = ApplyRecord.Action.GetHash(); + try + { + for (auto& Endpoint : m_Endpoints) + { + if (Endpoint->IsHealthy()) + { + PostUpstreamApplyResult Result = Endpoint->PostApply(std::move(ApplyRecord)); + { + std::scoped_lock Lock(m_ApplyTasksMutex); + if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) + { + if (Result.Success) + { + Status->State = UpstreamApplyState::Executing; + } + else + { + Status->State = UpstreamApplyState::Complete; + Status->Result = {.Error = std::move(Result.Error), + .Bytes = Result.Bytes, + .ElapsedSeconds = Result.ElapsedSeconds}; + } + } + } + m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); + return; + } + } + + { + std::scoped_lock Lock(m_ApplyTasksMutex); + if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) + { + Status->State = UpstreamApplyState::Complete; + Status->Result = {.Error{.ErrorCode = -1, .Reason = "No available endpoint"}}; + } + Log().warn("process upstream apply ({}/{}) FAILED 'No available endpoint'", WorkerId, ActionId); + } + } + catch (std::exception& e) + { + std::scoped_lock Lock(m_ApplyTasksMutex); + if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) + { + Status->State = UpstreamApplyState::Complete; + Status->Result = {.Error{.ErrorCode = -1, .Reason = e.what()}}; + } + Log().warn("process upstream apply ({}/{}) FAILED '{}'", WorkerId, ActionId, e.what()); + } + } + + void ProcessUpstreamQueue() + { + for (;;) + { + UpstreamApplyRecord ApplyRecord; + if (m_UpstreamQueue.WaitAndDequeue(ApplyRecord)) + { + ProcessApplyRecord(std::move(ApplyRecord)); + } + + if (!m_RunState.IsRunning) + { + break; + } + } + } + + void ProcessApplyUpdates() + { + for (auto& Endpoint : m_Endpoints) + { + if (Endpoint->IsHealthy()) + { + GetUpstreamApplyUpdatesResult Result = Endpoint->GetUpdates(); + m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); + + if (!Result.Success) + { + Log().warn("process upstream apply updates FAILED '{}'", Result.Error.Reason); + } + + if (!Result.Completed.empty()) + { + for (auto& It : Result.Completed) + { + for (auto& It2 : It.second) + { + std::scoped_lock Lock(m_ApplyTasksMutex); + if (auto Status = FindStatus(It.first, It2.first); Status != nullptr) + { + Status->State = UpstreamApplyState::Complete; + Status->Result = std::move(It2.second); + } + } + } + } + } + } + } + + void ProcessUpstreamUpdates() + { + const auto& UpdateSleep = std::chrono::seconds(m_Options.UpdatesInterval); + for (;;) + { + std::this_thread::sleep_for(UpdateSleep); + + if (!m_RunState.IsRunning) + { + break; + } + + ProcessApplyUpdates(); + + // Remove any expired tasks, regardless of state + { + std::scoped_lock Lock(m_ApplyTasksMutex); + for (auto& WorkerIt : m_ApplyTasks) + { + const auto Count = std::erase_if(WorkerIt.second, [](const auto& Item) { + return Item.second.ExpireTime < std::chrono::steady_clock::now(); + }); + if (Count > 0) + { + Log().debug("Removed '{}' expired tasks", Count); + } + } + const auto Count = std::erase_if(m_ApplyTasks, [](const auto& Item) { return Item.second.empty(); }); + if (Count > 0) + { + Log().debug("Removed '{}' empty task lists", Count); + } + } + } + } + + void MonitorEndpoints() + { + for (;;) + { + { + std::unique_lock Lock(m_RunState.Mutex); + if (m_RunState.ExitSignal.wait_for(Lock, m_Options.HealthCheckInterval, [this]() { return !m_RunState.IsRunning.load(); })) + { + break; + } + } + + for (auto& Endpoint : m_Endpoints) + { + if (!Endpoint->IsHealthy()) + { + if (const UpstreamEndpointHealth Health = Endpoint->CheckHealth(); Health.Ok) + { + Log().warn("health check endpoint '{}' OK", Endpoint->DisplayName(), Health.Reason); + } + else + { + Log().warn("health check endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason); + } + } + } + } + } + + void Shutdown() + { + if (m_RunState.Stop()) + { + m_UpstreamQueue.CompleteAdding(); + for (std::thread& Thread : m_UpstreamThreads) + { + Thread.join(); + } + + m_EndpointMonitorThread.join(); + m_UpstreamUpdatesThread.join(); + m_UpstreamThreads.clear(); + m_Endpoints.clear(); + } + } + + spdlog::logger& Log() { return m_Log; } + + using UpstreamApplyQueue = BlockingQueue<UpstreamApplyRecord>; + + struct RunState + { + std::mutex Mutex; + std::condition_variable ExitSignal; + std::atomic_bool IsRunning{false}; + + bool Stop() + { + bool Stopped = false; + { + std::scoped_lock Lock(Mutex); + Stopped = IsRunning.exchange(false); + } + if (Stopped) + { + ExitSignal.notify_all(); + } + return Stopped; + } + }; + + spdlog::logger& m_Log; + UpstreamApplyOptions m_Options; + CasStore& m_CasStore; + CidStore& m_CidStore; + UpstreamApplyQueue m_UpstreamQueue; + UpstreamApplyStats m_Stats; + UpstreamApplyTasks m_ApplyTasks; + std::mutex m_ApplyTasksMutex; + std::vector<std::unique_ptr<UpstreamApplyEndpoint>> m_Endpoints; + std::vector<std::thread> m_UpstreamThreads; + std::thread m_UpstreamUpdatesThread; + std::thread m_EndpointMonitorThread; + RunState m_RunState; +}; + +////////////////////////////////////////////////////////////////////////// + +std::unique_ptr<UpstreamApply> +MakeUpstreamApply(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore) +{ + return std::make_unique<DefaultUpstreamApply>(Options, CasStore, CidStore); +} + +std::unique_ptr<UpstreamApplyEndpoint> +MakeHordeUpstreamEndpoint(const CloudCacheClientOptions& Options, CasStore& CasStore, CidStore& CidStore) +{ + return std::make_unique<detail::HordeUpstreamApplyEndpoint>(Options, CasStore, CidStore); +} + +} // namespace zen diff --git a/zenserver/upstream/upstreamapply.h b/zenserver/upstream/upstreamapply.h new file mode 100644 index 000000000..8f72660c7 --- /dev/null +++ b/zenserver/upstream/upstreamapply.h @@ -0,0 +1,172 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/compactbinarypackage.h> +#include <zencore/iobuffer.h> +#include <zencore/iohash.h> +#include <zencore/zencore.h> + +#include <atomic> +#include <chrono> +#include <memory> +#include <unordered_map> +#include <unordered_set> + +namespace zen { + +class CbObjectWriter; +class CasStore; +class CidStore; +class ZenCacheStore; +struct CloudCacheClientOptions; + +enum class UpstreamApplyState : int32_t +{ + Queued = 0, + Executing = 1, + Complete = 2, +}; + +struct UpstreamApplyRecord +{ + CbObject WorkerDescriptor; + CbObject Action; +}; + +struct UpstreamApplyOptions +{ + std::chrono::seconds HealthCheckInterval{5}; + std::chrono::seconds UpdatesInterval{5}; + uint32_t ThreadCount = 4; + bool StatsEnabled = false; +}; + +struct UpstreamApplyError +{ + int32_t ErrorCode{}; + std::string Reason{}; + + explicit operator bool() const { return ErrorCode != 0; } +}; + +struct PostUpstreamApplyResult +{ + UpstreamApplyError Error{}; + int64_t Bytes{}; + double ElapsedSeconds{}; + bool Success = false; +}; + +struct GetUpstreamApplyResult +{ + CbPackage OutputPackage{}; + int64_t TotalAttachmentBytes{}; + int64_t TotalRawAttachmentBytes{}; + UpstreamApplyError Error{}; + int64_t Bytes{}; + double ElapsedSeconds{}; + std::string StdOut{}; + std::string StdErr{}; + bool Success = false; +}; + +using UpstreamApplyCompleted = std::unordered_map<IoHash, std::unordered_map<IoHash, GetUpstreamApplyResult>>; + +struct GetUpstreamApplyUpdatesResult +{ + UpstreamApplyError Error{}; + int64_t Bytes{}; + double ElapsedSeconds{}; + UpstreamApplyCompleted Completed{}; + bool Success = false; +}; + +struct UpstreamApplyStatus +{ + UpstreamApplyState State{}; + GetUpstreamApplyResult Result{}; + std::chrono::steady_clock::time_point ExpireTime{}; +}; + +using UpstreamApplyTasks = std::unordered_map<IoHash, std::unordered_map<IoHash, UpstreamApplyStatus>>; + +struct UpstreamEndpointHealth +{ + std::string Reason; + bool Ok = false; +}; + +struct UpstreamApplyEndpointStats +{ + std::atomic_uint64_t PostCount{}; + std::atomic_uint64_t CompleteCount{}; + std::atomic_uint64_t UpdateCount{}; + std::atomic_uint64_t ErrorCount{}; + std::atomic<double> UpBytes{}; + std::atomic<double> DownBytes{}; + std::atomic<double> SecondsUp{}; + std::atomic<double> SecondsDown{}; +}; + +/** + * The upstream apply endpont is responsible for handling remote execution. + */ +class UpstreamApplyEndpoint +{ +public: + virtual ~UpstreamApplyEndpoint() = default; + + virtual UpstreamEndpointHealth Initialize() = 0; + + virtual bool IsHealthy() const = 0; + + virtual UpstreamEndpointHealth CheckHealth() = 0; + + virtual std::string_view DisplayName() const = 0; + + virtual PostUpstreamApplyResult PostApply(const UpstreamApplyRecord& ApplyRecord) = 0; + + virtual GetUpstreamApplyUpdatesResult GetUpdates() = 0; + + virtual UpstreamApplyEndpointStats& Stats() = 0; +}; + +/** + * Manages one or more upstream cache endpoints. + */ +class UpstreamApply +{ +public: + virtual ~UpstreamApply() = default; + + virtual bool Initialize() = 0; + + virtual void RegisterEndpoint(std::unique_ptr<UpstreamApplyEndpoint> Endpoint) = 0; + + struct EnqueueResult + { + IoHash ApplyId{}; + bool Success = false; + }; + + struct StatusResult + { + UpstreamApplyStatus Status{}; + bool Success = false; + }; + + virtual EnqueueResult EnqueueUpstream(UpstreamApplyRecord ApplyRecord) = 0; + + virtual StatusResult GetStatus(const IoHash& WorkerId, const IoHash& ActionId) = 0; + + virtual void GetStatus(CbObjectWriter& CbO) = 0; +}; + +std::unique_ptr<UpstreamApply> MakeUpstreamApply(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore); + +std::unique_ptr<UpstreamApplyEndpoint> MakeHordeUpstreamEndpoint(const CloudCacheClientOptions& Options, + CasStore& CasStore, + CidStore& CidStore); + +} // namespace zen diff --git a/zenserver/zenserver.vcxproj b/zenserver/zenserver.vcxproj index d954d3f8d..480d5dd15 100644 --- a/zenserver/zenserver.vcxproj +++ b/zenserver/zenserver.vcxproj @@ -125,6 +125,7 @@ <ClInclude Include="diag\diagsvcs.h" /> <ClInclude Include="experimental\usnjournal.h" /> <ClInclude Include="targetver.h" /> + <ClInclude Include="upstream\upstreamapply.h" /> <ClInclude Include="upstream\upstreamcache.h" /> <ClInclude Include="upstream\zen.h" /> <ClInclude Include="windows\service.h" /> @@ -147,6 +148,7 @@ <ClCompile Include="testing\launch.cpp" /> <ClCompile Include="casstore.cpp" /> <ClCompile Include="experimental\usnjournal.cpp" /> + <ClCompile Include="upstream\upstreamapply.cpp" /> <ClCompile Include="upstream\upstreamcache.cpp" /> <ClCompile Include="upstream\zen.cpp" /> <ClCompile Include="windows\service.cpp" /> diff --git a/zenserver/zenserver.vcxproj.filters b/zenserver/zenserver.vcxproj.filters index 04c6267ba..6de9230d3 100644 --- a/zenserver/zenserver.vcxproj.filters +++ b/zenserver/zenserver.vcxproj.filters @@ -41,6 +41,9 @@ <ClInclude Include="experimental\vfs.h" /> <ClInclude Include="monitoring\httpstats.h" /> <ClInclude Include="monitoring\httpstatus.h" /> + <ClInclude Include="upstream\upstreamapply.h"> + <Filter>upstream</Filter> + </ClInclude> </ItemGroup> <ItemGroup> <ClCompile Include="zenserver.cpp" /> @@ -76,6 +79,9 @@ <ClCompile Include="experimental\vfs.cpp" /> <ClCompile Include="monitoring\httpstats.cpp" /> <ClCompile Include="monitoring\httpstatus.cpp" /> + <ClCompile Include="upstream\upstreamapply.cpp"> + <Filter>upstream</Filter> + </ClCompile> </ItemGroup> <ItemGroup> <Filter Include="cache"> |