aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPer Larsson <[email protected]>2021-11-13 14:35:42 +0100
committerPer Larsson <[email protected]>2021-11-13 14:35:42 +0100
commitc2eb50ecfa58a11ffbacb0127d5ddf10ab7e5663 (patch)
tree90c95750cb6972329aaddc91e69444bd09570b6b
parentCleanup attachment validation. (diff)
parentRemote Apply: Get Expire timeout from worker (diff)
downloadzen-c2eb50ecfa58a11ffbacb0127d5ddf10ab7e5663.tar.xz
zen-c2eb50ecfa58a11ffbacb0127d5ddf10ab7e5663.zip
Merge branch 'main' into zcache-batch
-rw-r--r--zenserver/upstream/jupiter.cpp215
-rw-r--r--zenserver/upstream/jupiter.h18
-rw-r--r--zenserver/upstream/upstreamapply.cpp1492
-rw-r--r--zenserver/upstream/upstreamapply.h172
-rw-r--r--zenserver/zenserver.vcxproj2
-rw-r--r--zenserver/zenserver.vcxproj.filters6
6 files changed, 1841 insertions, 64 deletions
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp
index 7a36b5841..4caa5c8df 100644
--- a/zenserver/upstream/jupiter.cpp
+++ b/zenserver/upstream/jupiter.cpp
@@ -182,9 +182,14 @@ CloudCacheSession::GetBlob(const IoHash& Key)
{
return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
}
+ else if (!VerifyAccessToken(Response.status_code))
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
const bool Success = Response.status_code == 200;
- const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
+ const IoBuffer Buffer =
+ Success && Response.text.size() > 0 ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success};
}
@@ -214,6 +219,10 @@ CloudCacheSession::GetCompressedBlob(const IoHash& Key)
{
return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
}
+ else if (!VerifyAccessToken(Response.status_code))
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
const bool Success = Response.status_code == 200;
const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
@@ -246,6 +255,10 @@ CloudCacheSession::GetObject(const IoHash& Key)
{
return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
}
+ else if (!VerifyAccessToken(Response.status_code))
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
const bool Success = Response.status_code == 200;
const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
@@ -571,67 +584,41 @@ CloudCacheSession::RefExists(std::string_view BucketId, const IoHash& Key)
CloudCacheResult
CloudCacheSession::BlobExists(const IoHash& Key)
{
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
- if (!AccessToken.IsValid())
- {
- return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
- }
-
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString();
-
- cpr::Session& Session = m_SessionState->Session;
-
- Session.SetOption(cpr::Url{Uri.c_str()});
-
- cpr::Response Response = Session.Head();
- ZEN_DEBUG("HEAD {}", Response);
-
- if (Response.error)
- {
- return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
- }
- else if (!VerifyAccessToken(Response.status_code))
- {
- return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
- }
-
- return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
+ return CacheTypeExists("blobs"sv, Key);
}
CloudCacheResult
CloudCacheSession::CompressedBlobExists(const IoHash& Key)
{
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
- if (!AccessToken.IsValid())
- {
- return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
- }
-
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString();
-
- cpr::Session& Session = m_SessionState->Session;
+ return CacheTypeExists("compressed-blobs"sv, Key);
+}
- Session.SetOption(cpr::Url{Uri.c_str()});
+CloudCacheResult
+CloudCacheSession::ObjectExists(const IoHash& Key)
+{
+ return CacheTypeExists("objects"sv, Key);
+}
- cpr::Response Response = Session.Head();
- ZEN_DEBUG("HEAD {}", Response);
+CloudCacheExistsResult
+CloudCacheSession::BlobExists(const std::set<IoHash>& Keys)
+{
+ return CacheTypeExists("blobs"sv, Keys);
+}
- if (Response.error)
- {
- return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
- }
- else if (!VerifyAccessToken(Response.status_code))
- {
- return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
- }
+CloudCacheExistsResult
+CloudCacheSession::CompressedBlobExists(const std::set<IoHash>& Keys)
+{
+ return CacheTypeExists("compressed-blobs"sv, Keys);
+}
- return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
+CloudCacheExistsResult
+CloudCacheSession::ObjectExists(const std::set<IoHash>& Keys)
+{
+ return CacheTypeExists("objects"sv, Keys);
}
CloudCacheResult
-CloudCacheSession::ObjectExists(const IoHash& Key)
+CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksData)
{
const CloudCacheAccessToken& AccessToken = GetAccessToken();
if (!AccessToken.IsValid())
@@ -640,14 +627,16 @@ CloudCacheSession::ObjectExists(const IoHash& Key)
}
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString();
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId;
- cpr::Session& Session = m_SessionState->Session;
+ auto& Session = m_SessionState->Session;
Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}});
+ Session.SetBody(cpr::Body{(const char*)TasksData.Data(), TasksData.Size()});
- cpr::Response Response = Session.Head();
- ZEN_DEBUG("HEAD {}", Response);
+ cpr::Response Response = Session.Post();
+ ZEN_DEBUG("POST {}", Response);
if (Response.error)
{
@@ -662,7 +651,7 @@ CloudCacheSession::ObjectExists(const IoHash& Key)
}
CloudCacheResult
-CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksData)
+CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds)
{
const CloudCacheAccessToken& AccessToken = GetAccessToken();
if (!AccessToken.IsValid())
@@ -671,13 +660,12 @@ CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksDa
}
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId;
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId << "/updates?wait=" << WaitSeconds;
auto& Session = m_SessionState->Session;
Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}});
- Session.SetBody(cpr::Body{(const char*)TasksData.Data(), TasksData.Size()});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}});
cpr::Response Response = Session.Post();
ZEN_DEBUG("POST {}", Response);
@@ -698,7 +686,7 @@ CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksDa
}
CloudCacheResult
-CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds)
+CloudCacheSession::GetObjectTree(const IoHash& Key)
{
const CloudCacheAccessToken& AccessToken = GetAccessToken();
if (!AccessToken.IsValid())
@@ -707,15 +695,15 @@ CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t
}
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId << "/updates?wait=" << WaitSeconds;
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString() << "/tree";
- auto& Session = m_SessionState->Session;
+ cpr::Session& Session = m_SessionState->Session;
Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/octet-stream"}});
- cpr::Response Response = Session.Post();
- ZEN_DEBUG("POST {}", Response);
+ cpr::Response Response = Session.Get();
+ ZEN_DEBUG("GET {}", Response);
if (Response.error)
{
@@ -761,6 +749,92 @@ CloudCacheSession::VerifyAccessToken(long StatusCode)
return true;
}
+CloudCacheResult
+CloudCacheSession::CacheTypeExists(std::string_view TypeId, const IoHash& Key)
+{
+ const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ if (!AccessToken.IsValid())
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
+
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString();
+
+ cpr::Session& Session = m_SessionState->Session;
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}});
+
+ cpr::Response Response = Session.Head();
+ ZEN_DEBUG("HEAD {}", Response);
+
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
+ }
+ else if (!VerifyAccessToken(Response.status_code))
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
+
+ return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
+}
+
+CloudCacheExistsResult
+CloudCacheSession::CacheTypeExists(std::string_view TypeId, const std::set<IoHash>& Keys)
+{
+ const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ if (!AccessToken.IsValid())
+ {
+ return {CloudCacheResult{.ErrorCode = 401, .Reason = std::string("Invalid access token")}};
+ }
+
+ ExtendableStringBuilder<256> Query;
+ for (const auto& Key : Keys)
+ {
+ Query << (Query.Size() != 0 ? "&id=" : "id=") << Key.ToHexString();
+ }
+
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << m_CacheClient->BlobStoreNamespace() << "/exists?" << Query;
+
+ cpr::Session& Session = m_SessionState->Session;
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}});
+
+ cpr::Response Response = Session.Post();
+ ZEN_DEBUG("POST {}", Response);
+
+ if (Response.error)
+ {
+ return {CloudCacheResult{.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}};
+ }
+ else if (!VerifyAccessToken(Response.status_code))
+ {
+ return {CloudCacheResult{.ErrorCode = 401, .Reason = std::string("Invalid access token")}};
+ }
+
+ CloudCacheExistsResult Result{
+ CloudCacheResult{.Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}};
+
+ if (Result.Success)
+ {
+ IoBuffer Buffer = IoBuffer(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size());
+ const CbObject ExistsResponse = LoadCompactBinaryObject(Buffer);
+ for (auto& Item : ExistsResponse["id"sv])
+ {
+ if (Item.IsHash())
+ {
+ Result.Have.insert(Item.AsHash());
+ }
+ }
+ }
+
+ return Result;
+}
+
//////////////////////////////////////////////////////////////////////////
//
// ServiceUrl: https://jupiter.devtools.epicgames.com
@@ -778,7 +852,14 @@ CloudCacheClient::CloudCacheClient(const CloudCacheClientOptions& Options)
, m_BlobStoreNamespace(Options.BlobStoreNamespace)
, m_OAuthClientId(Options.OAuthClientId)
, m_OAuthSecret(Options.OAuthSecret)
+, m_AccessToken(Options.AccessToken)
{
+ if (!Options.AccessToken.empty())
+ {
+ // If an access token was provided, OAuth settings are not used.
+ return;
+ }
+
if (!Options.OAuthProvider.starts_with("http://"sv) && !Options.OAuthProvider.starts_with("https://"sv))
{
ZEN_WARN("bad provider specification: '{}' - must be fully qualified", Options.OAuthProvider);
@@ -828,6 +909,12 @@ CloudCacheClient::AcquireAccessToken()
{
using namespace std::chrono;
+ // If an access token was provided, return it instead of querying OAuth
+ if (!m_AccessToken.empty())
+ {
+ return {m_AccessToken, steady_clock::time_point::max()};
+ }
+
ExtendableStringBuilder<128> OAuthFormData;
OAuthFormData << "client_id=" << m_OAuthClientId << "&scope=cache_access&grant_type=client_credentials&client_secret=" << m_OAuthSecret;
diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h
index 9471ef64f..13d65587e 100644
--- a/zenserver/upstream/jupiter.h
+++ b/zenserver/upstream/jupiter.h
@@ -12,6 +12,7 @@
#include <chrono>
#include <list>
#include <memory>
+#include <set>
#include <vector>
struct ZenCacheValue;
@@ -64,6 +65,11 @@ struct FinalizeRefResult : CloudCacheResult
std::vector<IoHash> Needs;
};
+struct CloudCacheExistsResult : CloudCacheResult
+{
+ std::set<IoHash> Have;
+};
+
/**
* Context for performing Jupiter operations
*
@@ -95,12 +101,18 @@ public:
FinalizeRefResult FinalizeRef(std::string_view BucketId, const IoHash& Key, const IoHash& RefHah);
CloudCacheResult RefExists(std::string_view BucketId, const IoHash& Key);
+
CloudCacheResult BlobExists(const IoHash& Key);
CloudCacheResult CompressedBlobExists(const IoHash& Key);
CloudCacheResult ObjectExists(const IoHash& Key);
+ CloudCacheExistsResult BlobExists(const std::set<IoHash>& Keys);
+ CloudCacheExistsResult CompressedBlobExists(const std::set<IoHash>& Keys);
+ CloudCacheExistsResult ObjectExists(const std::set<IoHash>& Keys);
+
CloudCacheResult PostComputeTasks(std::string_view ChannelId, IoBuffer TasksData);
CloudCacheResult GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds = 0);
+ CloudCacheResult GetObjectTree(const IoHash& Key);
std::vector<IoHash> Filter(std::string_view BucketId, const std::vector<IoHash>& ChunkHashes);
@@ -109,6 +121,10 @@ private:
const CloudCacheAccessToken& GetAccessToken();
bool VerifyAccessToken(long StatusCode);
+ CloudCacheResult CacheTypeExists(std::string_view TypeId, const IoHash& Key);
+
+ CloudCacheExistsResult CacheTypeExists(std::string_view TypeId, const std::set<IoHash>& Keys);
+
spdlog::logger& m_Log;
RefPtr<CloudCacheClient> m_CacheClient;
detail::CloudCacheSessionState* m_SessionState;
@@ -122,6 +138,7 @@ struct CloudCacheClientOptions
std::string_view OAuthProvider;
std::string_view OAuthClientId;
std::string_view OAuthSecret;
+ std::string_view AccessToken;
bool UseLegacyDdc = false;
};
@@ -152,6 +169,7 @@ private:
std::string m_BlobStoreNamespace;
std::string m_OAuthClientId;
std::string m_OAuthSecret;
+ std::string m_AccessToken;
bool m_IsValid = false;
RwLock m_SessionStateLock;
diff --git a/zenserver/upstream/upstreamapply.cpp b/zenserver/upstream/upstreamapply.cpp
new file mode 100644
index 000000000..3f1b0d8f9
--- /dev/null
+++ b/zenserver/upstream/upstreamapply.cpp
@@ -0,0 +1,1492 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "upstreamapply.h"
+#include "jupiter.h"
+#include "zen.h"
+
+#include <zencore/blockingqueue.h>
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinarypackage.h>
+#include <zencore/compactbinaryvalidation.h>
+#include <zencore/compress.h>
+#include <zencore/fmtutils.h>
+#include <zencore/session.h>
+#include <zencore/stats.h>
+#include <zencore/stream.h>
+#include <zencore/timer.h>
+
+#include <zenstore/cas.h>
+#include <zenstore/cidstore.h>
+
+#include "cache/structuredcachestore.h"
+#include "diag/logging.h"
+
+#include <fmt/format.h>
+
+#include <algorithm>
+#include <atomic>
+#include <map>
+#include <set>
+#include <stack>
+#include <thread>
+#include <unordered_map>
+
+namespace zen {
+
+using namespace std::literals;
+
+namespace detail {
+
+ class HordeUpstreamApplyEndpoint final : public UpstreamApplyEndpoint
+ {
+ public:
+ HordeUpstreamApplyEndpoint(const CloudCacheClientOptions& Options, CasStore& CasStore, CidStore& CidStore)
+ : m_Log(logging::Get("upstream-apply"))
+ , m_CasStore(CasStore)
+ , m_CidStore(CidStore)
+ {
+ using namespace fmt::literals;
+ m_DisplayName = "Horde - '{}'"_format(Options.ServiceUrl);
+ m_Client = new CloudCacheClient(Options);
+ m_ChannelId = "zen-{}"_format(zen::GetSessionIdString());
+ }
+
+ virtual ~HordeUpstreamApplyEndpoint() = default;
+
+ virtual UpstreamEndpointHealth Initialize() override { return CheckHealth(); }
+
+ virtual bool IsHealthy() const override { return m_HealthOk.load(); }
+
+ virtual UpstreamEndpointHealth CheckHealth() override
+ {
+ try
+ {
+ CloudCacheSession Session(m_Client);
+ CloudCacheResult Result = Session.Authenticate();
+
+ m_HealthOk = Result.ErrorCode == 0;
+
+ return {.Reason = std::move(Result.Reason), .Ok = Result.Success};
+ }
+ catch (std::exception& Err)
+ {
+ return {.Reason = Err.what(), .Ok = false};
+ }
+ }
+
+ virtual std::string_view DisplayName() const override { return m_DisplayName; }
+
+ virtual PostUpstreamApplyResult PostApply(const UpstreamApplyRecord& ApplyRecord) override
+ {
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+
+ try
+ {
+ UpstreamData UpstreamData;
+ if (!ProcessApplyKey(ApplyRecord, UpstreamData))
+ {
+ return {.Error{.ErrorCode = -1, .Reason = "Failed to generate task data"}};
+ }
+
+ {
+ std::scoped_lock Lock(m_TaskMutex);
+ if (m_PendingTasks.contains(UpstreamData.TaskId))
+ {
+ // Pending task is already queued, return success
+ return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true};
+ }
+ m_PendingTasks[UpstreamData.TaskId] = ApplyRecord;
+ }
+
+ CloudCacheSession Session(m_Client);
+
+ {
+ CloudCacheResult Result = BatchPutBlobsIfMissing(Session, UpstreamData.Blobs);
+ Bytes += Result.Bytes;
+ ElapsedSeconds += Result.ElapsedSeconds;
+ if (!Result.Success)
+ {
+ return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds};
+ }
+ UpstreamData.Blobs.clear();
+ }
+
+ {
+ CloudCacheResult Result = BatchPutObjectsIfMissing(Session, UpstreamData.Objects);
+ Bytes += Result.Bytes;
+ ElapsedSeconds += Result.ElapsedSeconds;
+ if (!Result.Success)
+ {
+ return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds};
+ }
+ UpstreamData.Objects.clear();
+ }
+
+ CbObjectWriter Writer;
+ Writer.AddObjectAttachment("r"sv, UpstreamData.RequirementsId);
+ Writer.BeginArray("t"sv);
+ Writer.AddObjectAttachment(UpstreamData.TaskId);
+ Writer.EndArray();
+ IoBuffer TasksData = Writer.Save().GetBuffer().AsIoBuffer();
+
+ CloudCacheResult Result = Session.PostComputeTasks(m_ChannelId, TasksData);
+ Bytes += Result.Bytes;
+ ElapsedSeconds += Result.ElapsedSeconds;
+ if (!Result.Success)
+ {
+ {
+ std::scoped_lock Lock(m_TaskMutex);
+ m_PendingTasks.erase(UpstreamData.TaskId);
+ }
+
+ return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds};
+ }
+
+ return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true};
+ }
+ catch (std::exception& Err)
+ {
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = -1, .Reason = Err.what()}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds};
+ }
+ }
+
+ [[nodiscard]] CloudCacheResult BatchPutBlobsIfMissing(CloudCacheSession& Session, const std::map<IoHash, IoBuffer>& Blobs)
+ {
+ if (Blobs.size() == 0)
+ {
+ return {.Success = true};
+ }
+
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+
+ // Batch check for missing blobs
+ std::set<IoHash> Keys;
+ for (const auto& It : Blobs)
+ {
+ Keys.insert(It.first);
+ }
+
+ CloudCacheExistsResult ExistsResult = Session.BlobExists(Keys);
+ ElapsedSeconds += ExistsResult.ElapsedSeconds;
+ if (ExistsResult.ErrorCode != 0)
+ {
+ return {.Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds,
+ .ErrorCode = ExistsResult.ErrorCode,
+ .Reason = std::move(ExistsResult.Reason)};
+ }
+
+ // TODO: Batch upload missing blobs
+
+ for (const auto& It : Blobs)
+ {
+ if (ExistsResult.Have.contains(It.first))
+ {
+ continue;
+ }
+
+ CloudCacheResult Result = Session.PutBlob(It.first, It.second);
+ Bytes += Result.Bytes;
+ ElapsedSeconds += Result.ElapsedSeconds;
+ if (!Result.Success)
+ {
+ return {.Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds,
+ .ErrorCode = Result.ErrorCode,
+ .Reason = std::move(Result.Reason)};
+ }
+ }
+
+ return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true};
+ }
+
+ [[nodiscard]] CloudCacheResult BatchPutObjectsIfMissing(CloudCacheSession& Session, const std::map<IoHash, CbObject>& Objects)
+ {
+ if (Objects.size() == 0)
+ {
+ return {.Success = true};
+ }
+
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+
+ // Batch check for missing objects
+ std::set<IoHash> Keys;
+ for (const auto& It : Objects)
+ {
+ Keys.insert(It.first);
+ }
+
+ CloudCacheExistsResult ExistsResult = Session.ObjectExists(Keys);
+ ElapsedSeconds += ExistsResult.ElapsedSeconds;
+ if (ExistsResult.ErrorCode != 0)
+ {
+ return {.Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds,
+ .ErrorCode = ExistsResult.ErrorCode,
+ .Reason = std::move(ExistsResult.Reason)};
+ }
+
+ // TODO: Batch upload missing objects
+
+ for (const auto& It : Objects)
+ {
+ if (ExistsResult.Have.contains(It.first))
+ {
+ continue;
+ }
+
+ CloudCacheResult Result = Session.PutObject(It.first, It.second.GetBuffer().AsIoBuffer());
+ Bytes += Result.Bytes;
+ ElapsedSeconds += Result.ElapsedSeconds;
+ if (!Result.Success)
+ {
+ return {.Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds,
+ .ErrorCode = Result.ErrorCode,
+ .Reason = std::move(Result.Reason)};
+ }
+ }
+
+ return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true};
+ }
+
+ enum class ComputeTaskState : int32_t
+ {
+ Queued = 0,
+ Executing = 1,
+ Complete = 2,
+ };
+
+ enum class ComputeTaskOutcome : int32_t
+ {
+ Success = 0,
+ Failed = 1,
+ Cancelled = 2,
+ NoResult = 3,
+ Exipred = 4,
+ BlobNotFound = 5,
+ Exception = 6,
+ };
+
+ virtual GetUpstreamApplyUpdatesResult GetUpdates() override
+ {
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+ UpstreamApplyCompleted CompletedTasks;
+
+ {
+ std::scoped_lock Lock(m_TaskMutex);
+ if (m_PendingTasks.empty())
+ {
+ // Nothing to do.
+ return {.Success = true};
+ }
+ }
+
+ try
+ {
+ CloudCacheSession Session(m_Client);
+
+ CloudCacheResult UpdatesResult = Session.GetComputeUpdates(m_ChannelId);
+ Bytes += UpdatesResult.Bytes;
+ ElapsedSeconds += UpdatesResult.ElapsedSeconds;
+ if (UpdatesResult.ErrorCode != 0)
+ {
+ return {.Error{.ErrorCode = UpdatesResult.ErrorCode, .Reason = std::move(UpdatesResult.Reason)},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds};
+ }
+
+ if (!UpdatesResult.Success)
+ {
+ return {.Error{.ErrorCode = -1, .Reason = "Failed get task updates"}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds};
+ }
+
+ CbObject TaskStatus = LoadCompactBinaryObject(UpdatesResult.Response);
+
+ // zen::StringBuilder<4096> ObjStr;
+ // zen::CompactBinaryToJson(TaskStatus, ObjStr);
+
+ for (auto& It : TaskStatus["u"sv])
+ {
+ CbObjectView Status = It.AsObjectView();
+ const ComputeTaskState State = (ComputeTaskState)Status["s"sv].AsInt32();
+
+ const std::string_view AgentId = TaskStatus["a"sv].AsString();
+ const std::string_view LeaseId = TaskStatus["l"sv].AsString();
+
+ // Only care about completed tasks
+ if (State != ComputeTaskState::Complete)
+ {
+ continue;
+ }
+
+ const IoHash TaskId = Status["h"sv].AsObjectAttachment();
+
+ IoHash WorkerId;
+ IoHash ActionId;
+
+ {
+ std::scoped_lock Lock(m_TaskMutex);
+ auto TaskIt = m_PendingTasks.find(TaskId);
+ if (TaskIt == m_PendingTasks.end())
+ {
+ continue;
+ }
+ WorkerId = TaskIt->second.WorkerDescriptor.GetHash();
+ ActionId = TaskIt->second.Action.GetHash();
+ m_PendingTasks.erase(TaskIt);
+ }
+
+ GetUpstreamApplyResult Result = ProcessTaskStatus(Status, Session);
+ Bytes += Result.Bytes;
+ ElapsedSeconds += Result.ElapsedSeconds;
+
+ CompletedTasks[WorkerId][ActionId] = std::move(Result);
+ }
+
+ return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Completed = std::move(CompletedTasks), .Success = true};
+ }
+ catch (std::exception& Err)
+ {
+ m_HealthOk = false;
+ return {
+ .Error{.ErrorCode = -1, .Reason = Err.what()},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds,
+ .Completed = std::move(CompletedTasks),
+ };
+ }
+ }
+
+ virtual UpstreamApplyEndpointStats& Stats() override { return m_Stats; }
+
+ private:
+ spdlog::logger& Log() { return m_Log; }
+
+ CasStore& m_CasStore;
+ CidStore& m_CidStore;
+ spdlog::logger& m_Log;
+ std::string m_DisplayName;
+ RefPtr<CloudCacheClient> m_Client;
+ UpstreamApplyEndpointStats m_Stats;
+ std::atomic_bool m_HealthOk{false};
+ std::string m_ChannelId;
+
+ std::mutex m_TaskMutex;
+ std::unordered_map<IoHash, UpstreamApplyRecord> m_PendingTasks;
+
+ struct UpstreamData
+ {
+ std::map<IoHash, IoBuffer> Blobs;
+ std::map<IoHash, CbObject> Objects;
+ IoHash TaskId;
+ IoHash RequirementsId;
+ };
+
+ struct UpstreamDirectory
+ {
+ std::filesystem::path Path;
+ std::map<std::string, UpstreamDirectory> Directories;
+ std::set<std::string> Files;
+ };
+
+ [[nodiscard]] GetUpstreamApplyResult ProcessTaskStatus(const CbObjectView& TaskStatus, CloudCacheSession& Session)
+ {
+ try
+ {
+ const ComputeTaskOutcome Outcome = (ComputeTaskOutcome)TaskStatus["o"sv].AsInt32();
+
+ if (Outcome != ComputeTaskOutcome::Success)
+ {
+ const std::string_view Detail = TaskStatus["d"sv].AsString();
+ return {.Error{.ErrorCode = -1, .Reason = std::string(Detail)}};
+ }
+
+ const IoHash TaskId = TaskStatus["h"sv].AsObjectAttachment();
+ const DateTime Time = TaskStatus["t"sv].AsDateTime();
+ const IoHash ResultHash = TaskStatus["r"sv].AsObjectAttachment();
+ const std::string_view AgentId = TaskStatus["a"sv].AsString();
+ const std::string_view LeaseId = TaskStatus["l"sv].AsString();
+
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+
+ // Get Result object and all Object Attachments + Binary Attachment IDs
+ CloudCacheResult ObjectTreeResult = Session.GetObjectTree(ResultHash);
+ Bytes += ObjectTreeResult.Bytes;
+ ElapsedSeconds += ObjectTreeResult.ElapsedSeconds;
+
+ if (ObjectTreeResult.ErrorCode != 0)
+ {
+ return {.Error{.ErrorCode = ObjectTreeResult.ErrorCode, .Reason = std::move(ObjectTreeResult.Reason)},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds};
+ }
+
+ if (!ObjectTreeResult.Success)
+ {
+ return {.Error{.ErrorCode = -1, .Reason = "Failed to get result object data"},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds};
+ }
+
+ std::map<IoHash, IoBuffer> TreeObjectData;
+ std::map<IoHash, IoBuffer> TreeBinaryData;
+
+ MemoryView ResponseView = ObjectTreeResult.Response;
+ while (ResponseView.GetSize() > 0)
+ {
+ CbFieldView Field = CbFieldView(ResponseView.GetData());
+ ResponseView += Field.GetSize();
+ if (Field.IsObjectAttachment())
+ {
+ const IoHash Hash = Field.AsObjectAttachment();
+ Field = CbFieldView(ResponseView.GetData());
+ ResponseView += Field.GetSize();
+ if (!Field.IsObject()) // No data
+ {
+ TreeObjectData[Hash] = {};
+ continue;
+ }
+ MemoryView FieldView = Field.AsObjectView().GetView();
+
+ TreeObjectData[Hash] = IoBuffer(IoBuffer::Wrap, FieldView.GetData(), FieldView.GetSize());
+ }
+ else if (Field.IsBinaryAttachment())
+ {
+ const IoHash Hash = Field.AsBinaryAttachment();
+ TreeBinaryData[Hash] = {};
+ }
+ else // Unknown type
+ {
+ }
+ }
+
+ for (auto& It : TreeObjectData)
+ {
+ if (It.second.GetSize() == 0)
+ {
+ CloudCacheResult ObjectResult = Session.GetObject(It.first);
+ Bytes += ObjectTreeResult.Bytes;
+ ElapsedSeconds += ObjectTreeResult.ElapsedSeconds;
+ if (ObjectTreeResult.ErrorCode != 0)
+ {
+ return {.Error{.ErrorCode = ObjectResult.ErrorCode, .Reason = std::move(ObjectResult.Reason)},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds};
+ }
+
+ if (!ObjectResult.Success)
+ {
+ return {.Error{.ErrorCode = -1, .Reason = "Failed to get result object attachment data"},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds};
+ }
+ It.second = std::move(ObjectResult.Response);
+ }
+ }
+
+ for (auto& It : TreeBinaryData)
+ {
+ if (It.second.GetSize() == 0)
+ {
+ CloudCacheResult BlobResult = Session.GetBlob(It.first);
+ Bytes += ObjectTreeResult.Bytes;
+ ElapsedSeconds += ObjectTreeResult.ElapsedSeconds;
+ if (BlobResult.ErrorCode != 0)
+ {
+ return {.Error{.ErrorCode = BlobResult.ErrorCode, .Reason = std::move(BlobResult.Reason)},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds};
+ }
+
+ if (!BlobResult.Success)
+ {
+ return {.Error{.ErrorCode = -1, .Reason = "Failed to get result binary attachment data"},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds};
+ }
+ It.second = std::move(BlobResult.Response);
+ }
+ }
+
+ CbObject ResultObject = LoadCompactBinaryObject(TreeObjectData[ResultHash]);
+ int32_t ExitCode = ResultObject["e"sv].AsInt32();
+ IoHash StdOutHash = ResultObject["so"sv].AsBinaryAttachment();
+ IoHash StdErrHash = ResultObject["se"sv].AsBinaryAttachment();
+ IoHash OutputHash = ResultObject["o"sv].AsObjectAttachment();
+
+ std::string StdOut = std::string((const char*)TreeBinaryData[StdOutHash].GetData(), TreeBinaryData[StdOutHash].GetSize());
+ std::string StdErr = std::string((const char*)TreeBinaryData[StdErrHash].GetData(), TreeBinaryData[StdErrHash].GetSize());
+
+ if (ExitCode != 0)
+ {
+ return {.Error{.ErrorCode = ExitCode, .Reason = "Task completed with errors"},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds,
+ .StdOut = std::move(StdOut),
+ .StdErr = std::move(StdErr)};
+ }
+
+ CbObject OutputObject = LoadCompactBinaryObject(TreeObjectData[OutputHash]);
+
+ // Get build.output
+ IoHash BuildOutputId;
+ IoBuffer BuildOutput;
+ for (auto& It : OutputObject["f"sv])
+ {
+ const CbObjectView FileObject = It.AsObjectView();
+ if (FileObject["n"sv].AsString() == "Build.output"sv)
+ {
+ BuildOutputId = FileObject["h"sv].AsBinaryAttachment();
+ BuildOutput = TreeBinaryData[BuildOutputId];
+ break;
+ }
+ }
+
+ if (BuildOutput.GetSize() == 0)
+ {
+ return {.Error{.ErrorCode = ExitCode, .Reason = "Build.output file not found in task results"},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds};
+ }
+
+ // Get Output directory node
+ IoBuffer OutputDirectoryTree;
+ for (auto& It : OutputObject["d"sv])
+ {
+ const CbObjectView DirectoryObject = It.AsObjectView();
+ if (DirectoryObject["n"sv].AsString() == "Outputs"sv)
+ {
+ OutputDirectoryTree = TreeObjectData[DirectoryObject["h"sv].AsObjectAttachment()];
+ break;
+ }
+ }
+
+ if (OutputDirectoryTree.GetSize() == 0)
+ {
+ return {.Error{.ErrorCode = ExitCode, .Reason = "Outputs directory not found in task results"},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds};
+ }
+
+ // load build.output as CbObject
+
+ // Move Outputs from Horde to CbPackage
+
+ std::unordered_map<IoHash, IoHash> CidToCompressedId;
+ CbPackage OutputPackage;
+ CbObject OutputDirectoryTreeObject = LoadCompactBinaryObject(OutputDirectoryTree);
+ int64_t TotalAttachmentBytes = 0;
+ int64_t TotalRawAttachmentBytes = 0;
+
+ for (auto& It : OutputDirectoryTreeObject["f"sv])
+ {
+ CbObjectView FileObject = It.AsObjectView();
+ // Name is the uncompressed hash
+ IoHash DecompressedId = IoHash::FromHexString(FileObject["n"sv].AsString());
+ // Hash is the compressed data hash, and how it is stored in Horde
+ IoHash CompressedId = FileObject["h"sv].AsBinaryAttachment();
+
+ if (!TreeBinaryData.contains(CompressedId))
+ {
+ Log().warn("Object attachment chunk not retrieved from Horde {}", CompressedId.ToHexString());
+ return {.Error{.ErrorCode = -1, .Reason = "Object attachment chunk not retrieved from Horde"},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds};
+ }
+ CidToCompressedId[DecompressedId] = CompressedId;
+ }
+
+ // Iterate attachments, verify all chunks exist, and add to CbPackage
+ bool AnyErrors = false;
+ CbObject BuildOutputObject = LoadCompactBinaryObject(BuildOutput);
+ BuildOutputObject.IterateAttachments([&](CbFieldView Field) {
+ const IoHash DecompressedId = Field.AsHash();
+ if (!CidToCompressedId.contains(DecompressedId))
+ {
+ Log().warn("Attachment not found {}", DecompressedId.ToHexString());
+ AnyErrors = true;
+ return;
+ }
+ const IoHash& CompressedId = CidToCompressedId.at(DecompressedId);
+
+ if (!TreeBinaryData.contains(CompressedId))
+ {
+ Log().warn("Missing output {} compressed {} uncompressed",
+ CompressedId.ToHexString(),
+ DecompressedId.ToHexString());
+ AnyErrors = true;
+ return;
+ }
+
+ CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(TreeBinaryData[CompressedId]));
+
+ if (!AttachmentBuffer)
+ {
+ Log().warn("Invalid output encountered (not valid CompressedBuffer format) {} compressed {} uncompressed",
+ CompressedId.ToHexString(),
+ DecompressedId.ToHexString());
+ AnyErrors = true;
+ return;
+ }
+
+ TotalAttachmentBytes += AttachmentBuffer.GetCompressedSize();
+ TotalRawAttachmentBytes += AttachmentBuffer.GetRawSize();
+
+ CbAttachment Attachment(AttachmentBuffer);
+ OutputPackage.AddAttachment(Attachment);
+ });
+
+ if (AnyErrors)
+ {
+ return {.Error{.ErrorCode = -1, .Reason = "Failed to get result object attachment data"},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds};
+ }
+
+ OutputPackage.SetObject(BuildOutputObject);
+
+ return {.OutputPackage = std::move(OutputPackage),
+ .TotalAttachmentBytes = TotalAttachmentBytes,
+ .TotalRawAttachmentBytes = TotalRawAttachmentBytes,
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds,
+ .StdOut = std::move(StdOut),
+ .StdErr = std::move(StdErr),
+ .Success = true};
+ }
+ catch (std::exception& Err)
+ {
+ return {.Error{.ErrorCode = -1, .Reason = Err.what()}};
+ }
+ }
+
+ [[nodiscard]] bool ProcessApplyKey(const UpstreamApplyRecord& ApplyRecord, UpstreamData& Data)
+ {
+ std::string ExecutablePath;
+ std::map<std::string, std::string> Environment;
+ std::set<std::filesystem::path> InputFiles;
+ std::map<std::filesystem::path, IoHash> InputFileHashes;
+
+ ExecutablePath = ApplyRecord.WorkerDescriptor["path"sv].AsString();
+ if (ExecutablePath.empty())
+ {
+ Log().warn("process apply upstream FAILED, '{}', path missing from worker descriptor",
+ ApplyRecord.WorkerDescriptor.GetHash());
+ return false;
+ }
+
+ for (auto& It : ApplyRecord.WorkerDescriptor["executables"sv])
+ {
+ CbObjectView FileEntry = It.AsObjectView();
+ if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.Blobs))
+ {
+ return false;
+ }
+ }
+
+ for (auto& It : ApplyRecord.WorkerDescriptor["files"sv])
+ {
+ CbObjectView FileEntry = It.AsObjectView();
+ if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.Blobs))
+ {
+ return false;
+ }
+ }
+
+ for (auto& It : ApplyRecord.WorkerDescriptor["environment"sv])
+ {
+ std::string_view Env = It.AsString();
+ auto Index = Env.find('=');
+ if (Index < 0)
+ {
+ Log().warn("process apply upstream FAILED, environment '{}' malformed", Env);
+ return false;
+ }
+
+ Environment[std::string(Env.substr(0, Index))] = Env.substr(Index + 1);
+ }
+
+ {
+ static const std::filesystem::path BuildActionPath = "Build.action"sv;
+ static const std::filesystem::path InputPath = "Inputs"sv;
+ const IoHash ActionId = ApplyRecord.Action.GetHash();
+
+ InputFiles.insert(BuildActionPath);
+ InputFileHashes[BuildActionPath] = ActionId;
+ Data.Blobs[ActionId] = IoBufferBuilder::MakeCloneFromMemory(ApplyRecord.Action.GetBuffer().GetData(),
+ ApplyRecord.Action.GetBuffer().GetSize());
+
+ bool AnyErrors = false;
+ ApplyRecord.Action.IterateAttachments([&](CbFieldView Field) {
+ const IoHash Cid = Field.AsHash();
+ const std::filesystem::path FilePath = {InputPath / Cid.ToHexString()};
+ IoBuffer DataBuffer = m_CidStore.FindChunkByCid(Cid);
+
+ if (!DataBuffer)
+ {
+ Log().warn("process apply upstream FAILED, input CID chunk '{}' missing", Cid);
+ AnyErrors = true;
+ return;
+ }
+
+ if (InputFiles.contains(FilePath))
+ {
+ return;
+ }
+
+ const IoHash CompressedId = IoHash::HashBuffer(DataBuffer.GetData(), DataBuffer.GetSize());
+
+ InputFiles.insert(FilePath);
+ InputFileHashes[FilePath] = CompressedId;
+ Data.Blobs[CompressedId] = std::move(DataBuffer);
+ });
+
+ if (AnyErrors)
+ {
+ return false;
+ }
+ }
+
+ const UpstreamDirectory RootDirectory = BuildDirectoryTree(InputFiles);
+
+ CbObject Sandbox = BuildMerkleTreeDirectory(RootDirectory, InputFileHashes, Data.Blobs, Data.Objects);
+ const IoHash SandboxHash = Sandbox.GetHash();
+ Data.Objects[SandboxHash] = std::move(Sandbox);
+
+ CbObject Requirements = BuildRequirements("OSFamily == 'Windows'"sv, {}, false);
+ const IoHash RequirementsId = Requirements.GetHash();
+ Data.Objects[RequirementsId] = std::move(Requirements);
+ Data.RequirementsId = RequirementsId;
+
+ CbObject Task = BuildTask(ExecutablePath,
+ {"-Build=build.action"},
+ Environment,
+ {},
+ SandboxHash,
+ RequirementsId,
+ {"Build.output", "Outputs"});
+
+ const IoHash TaskId = Task.GetHash();
+ Data.Objects[TaskId] = std::move(Task);
+ Data.TaskId = TaskId;
+
+ return true;
+ }
+
+ [[nodiscard]] bool ProcessFileEntry(const CbObjectView& FileEntry,
+ std::set<std::filesystem::path>& InputFiles,
+ std::map<std::filesystem::path, IoHash>& InputFileHashes,
+ std::map<IoHash, IoBuffer>& Blobs)
+ {
+ const std::filesystem::path FilePath = FileEntry["name"sv].AsString();
+ const IoHash ChunkId = FileEntry["hash"sv].AsHash();
+ const uint64_t Size = FileEntry["size"sv].AsUInt64();
+ IoBuffer DataBuffer = m_CasStore.FindChunk(ChunkId);
+
+ if (!DataBuffer)
+ {
+ Log().warn("process apply upstream FAILED, worker CAS chunk '{}' missing", ChunkId);
+ return false;
+ }
+
+ if (DataBuffer.Size() != Size)
+ {
+ Log().warn("process apply upstream FAILED, worker CAS chunk '{}' size: {}, action spec expected {}",
+ ChunkId,
+ DataBuffer.Size(),
+ Size);
+ return false;
+ }
+
+ if (InputFiles.contains(FilePath))
+ {
+ Log().warn("process apply upstream FAILED, worker CAS chunk '{}' size: {} duplicate filename {}", ChunkId, Size, FilePath);
+ return false;
+ }
+
+ InputFiles.insert(FilePath);
+ InputFileHashes[FilePath] = ChunkId;
+ Blobs[ChunkId] = std::move(DataBuffer);
+ return true;
+ }
+
+ [[nodiscard]] UpstreamDirectory BuildDirectoryTree(const std::set<std::filesystem::path>& InputFiles)
+ {
+ static const std::filesystem::path RootPath;
+ std::map<std::filesystem::path, UpstreamDirectory*> AllDirectories;
+ UpstreamDirectory RootDirectory = {.Path = RootPath};
+
+ AllDirectories[RootPath] = &RootDirectory;
+
+ // Build tree from flat list
+ for (const auto& Path : InputFiles)
+ {
+ if (Path.has_parent_path())
+ {
+ if (!AllDirectories.contains(Path.parent_path()))
+ {
+ std::stack<std::string> PathSplit;
+ {
+ std::filesystem::path ParentPath = Path.parent_path();
+ PathSplit.push(ParentPath.filename().string());
+ while (ParentPath.has_parent_path())
+ {
+ ParentPath = ParentPath.parent_path();
+ PathSplit.push(ParentPath.filename().string());
+ }
+ }
+ UpstreamDirectory* ParentPtr = &RootDirectory;
+ while (!PathSplit.empty())
+ {
+ if (!ParentPtr->Directories.contains(PathSplit.top()))
+ {
+ std::filesystem::path NewParentPath = {ParentPtr->Path / PathSplit.top()};
+ ParentPtr->Directories[PathSplit.top()] = {.Path = NewParentPath};
+ AllDirectories[NewParentPath] = &ParentPtr->Directories[PathSplit.top()];
+ }
+ ParentPtr = &ParentPtr->Directories[PathSplit.top()];
+ PathSplit.pop();
+ }
+ }
+
+ AllDirectories[Path.parent_path()]->Files.insert(Path.filename().string());
+ }
+ else
+ {
+ RootDirectory.Files.insert(Path.filename().string());
+ }
+ }
+
+ return RootDirectory;
+ }
+
+ [[nodiscard]] CbObject BuildMerkleTreeDirectory(const UpstreamDirectory& RootDirectory,
+ const std::map<std::filesystem::path, IoHash>& InputFileHashes,
+ const std::map<IoHash, IoBuffer>& Blobs,
+ std::map<IoHash, CbObject>& Objects)
+ {
+ CbObjectWriter DirectoryTreeWriter;
+
+ if (!RootDirectory.Files.empty())
+ {
+ DirectoryTreeWriter.BeginArray("f"sv);
+ for (const auto& File : RootDirectory.Files)
+ {
+ const std::filesystem::path FilePath = {RootDirectory.Path / File};
+ const IoHash& FileHash = InputFileHashes.at(FilePath);
+ const uint64_t FileSize = Blobs.at(FileHash).Size();
+ DirectoryTreeWriter.BeginObject();
+ DirectoryTreeWriter.AddString("n"sv, File);
+ DirectoryTreeWriter.AddBinaryAttachment("h"sv, FileHash);
+ DirectoryTreeWriter.AddInteger("s"sv, FileSize); // Size
+ // DirectoryTreeWriter.AddInteger("a"sv, 0); // Attributes Currently unneeded
+ DirectoryTreeWriter.EndObject();
+ }
+ DirectoryTreeWriter.EndArray();
+ }
+
+ if (!RootDirectory.Directories.empty())
+ {
+ DirectoryTreeWriter.BeginArray("d"sv);
+ for (const auto& Item : RootDirectory.Directories)
+ {
+ CbObject Directory = BuildMerkleTreeDirectory(Item.second, InputFileHashes, Blobs, Objects);
+ const IoHash DirectoryHash = Directory.GetHash();
+ Objects[DirectoryHash] = std::move(Directory);
+
+ DirectoryTreeWriter.BeginObject();
+ DirectoryTreeWriter.AddString("n"sv, Item.first);
+ DirectoryTreeWriter.AddObjectAttachment("h"sv, DirectoryHash);
+ DirectoryTreeWriter.EndObject();
+ }
+ DirectoryTreeWriter.EndArray();
+ }
+
+ return std::move(DirectoryTreeWriter.Save());
+ }
+
+ [[nodiscard]] CbObject BuildRequirements(const std::string_view Condition,
+ const std::map<std::string_view, int32_t>& Resources,
+ const bool Exclusive)
+ {
+ CbObjectWriter Writer;
+ Writer.AddString("c", Condition);
+ if (!Resources.empty())
+ {
+ Writer.BeginArray("r");
+ for (const auto& Resource : Resources)
+ {
+ Writer.BeginArray();
+ Writer.AddString(Resource.first);
+ Writer.AddInteger(Resource.second);
+ Writer.EndArray();
+ }
+ Writer.EndArray();
+ }
+ Writer.AddBool("e", Exclusive);
+ return std::move(Writer.Save());
+ }
+
+ [[nodiscard]] CbObject BuildTask(const std::string_view Executable,
+ const std::vector<std::string>& Arguments,
+ const std::map<std::string, std::string>& Environment,
+ const std::string_view WorkingDirectory,
+ const IoHash& SandboxHash,
+ const IoHash& RequirementsId,
+ const std::set<std::string>& Outputs)
+ {
+ CbObjectWriter TaskWriter;
+ TaskWriter.AddString("e"sv, Executable);
+
+ if (!Arguments.empty())
+ {
+ TaskWriter.BeginArray("a"sv);
+ for (const auto& Argument : Arguments)
+ {
+ TaskWriter.AddString(Argument);
+ }
+ TaskWriter.EndArray();
+ }
+
+ if (!Environment.empty())
+ {
+ TaskWriter.BeginArray("v"sv);
+ for (const auto& Env : Environment)
+ {
+ TaskWriter.BeginArray();
+ TaskWriter.AddString(Env.first);
+ TaskWriter.AddString(Env.second);
+ TaskWriter.EndArray();
+ }
+ TaskWriter.EndArray();
+ }
+
+ if (!WorkingDirectory.empty())
+ {
+ TaskWriter.AddString("s"sv, WorkingDirectory);
+ }
+
+ TaskWriter.AddObjectAttachment("s"sv, SandboxHash);
+ TaskWriter.AddObjectAttachment("r"sv, RequirementsId);
+
+ // Outputs
+ if (!Outputs.empty())
+ {
+ TaskWriter.BeginArray("o"sv);
+ for (const auto& Output : Outputs)
+ {
+ TaskWriter.AddString(Output);
+ }
+ TaskWriter.EndArray();
+ }
+
+ return std::move(TaskWriter.Save());
+ }
+ };
+} // namespace detail
+
+//////////////////////////////////////////////////////////////////////////
+
+struct UpstreamApplyStats
+{
+ static constexpr uint64_t MaxSampleCount = 1000ull;
+
+ UpstreamApplyStats(bool Enabled) : m_Enabled(Enabled) {}
+
+ void Add(spdlog::logger& Logger,
+ UpstreamApplyEndpoint& Endpoint,
+ const PostUpstreamApplyResult& Result,
+ const std::vector<std::unique_ptr<UpstreamApplyEndpoint>>& Endpoints)
+ {
+ UpstreamApplyEndpointStats& Stats = Endpoint.Stats();
+
+ if (Result.Error)
+ {
+ Stats.ErrorCount++;
+ }
+ else if (Result.Success)
+ {
+ Stats.PostCount++;
+ Stats.UpBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0);
+ Stats.SecondsUp.fetch_add(Result.ElapsedSeconds);
+ }
+
+ if (m_Enabled && m_SampleCount++ % MaxSampleCount)
+ {
+ Dump(Logger, Endpoints);
+ }
+ }
+
+ void Add(spdlog::logger& Logger,
+ UpstreamApplyEndpoint& Endpoint,
+ const GetUpstreamApplyUpdatesResult& Result,
+ const std::vector<std::unique_ptr<UpstreamApplyEndpoint>>& Endpoints)
+ {
+ UpstreamApplyEndpointStats& Stats = Endpoint.Stats();
+
+ if (Result.Error)
+ {
+ Stats.ErrorCount++;
+ }
+ else if (Result.Success)
+ {
+ Stats.UpdateCount++;
+ Stats.DownBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0);
+ Stats.SecondsDown.fetch_add(Result.ElapsedSeconds);
+ if (!Result.Completed.empty())
+ {
+ uint64_t Completed = 0;
+ for (auto& It : Result.Completed)
+ {
+ Completed += It.second.size();
+ }
+ Stats.CompleteCount.fetch_add(Completed);
+ }
+ }
+
+ if (m_Enabled && m_SampleCount++ % MaxSampleCount)
+ {
+ Dump(Logger, Endpoints);
+ }
+ }
+
+ void Dump(spdlog::logger& Logger, const std::vector<std::unique_ptr<UpstreamApplyEndpoint>>& Endpoints)
+ {
+ for (auto& Ep : Endpoints)
+ {
+ // These stats will not be totally correct as the numbers are not captured atomically
+
+ UpstreamApplyEndpointStats& Stats = Ep->Stats();
+ const uint64_t PostCount = Stats.PostCount;
+ const uint64_t CompleteCount = Stats.CompleteCount;
+ const uint64_t UpdateCount = Stats.UpdateCount;
+ const double DownBytes = Stats.DownBytes;
+ const double SecondsDown = Stats.SecondsDown;
+ const double UpBytes = Stats.UpBytes;
+ const double SecondsUp = Stats.SecondsUp;
+
+ const double UpSpeed = UpBytes > 0 ? UpBytes / SecondsUp : 0.0;
+ const double DownSpeed = DownBytes > 0 ? DownBytes / SecondsDown : 0.0;
+ const double CompleteRate = CompleteCount > 0 ? (double(PostCount) / double(CompleteCount)) : 0.0;
+
+ Logger.debug("STATS - '{}', Complete rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'",
+ Ep->DisplayName(),
+ CompleteRate,
+ DownBytes,
+ DownSpeed,
+ UpBytes,
+ UpSpeed);
+ }
+ }
+
+ bool m_Enabled;
+ std::atomic_uint64_t m_SampleCount = {};
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+class DefaultUpstreamApply final : public UpstreamApply
+{
+public:
+ DefaultUpstreamApply(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore)
+ : m_Log(logging::Get("upstream-apply"))
+ , m_Options(Options)
+ , m_CasStore(CasStore)
+ , m_CidStore(CidStore)
+ , m_Stats(Options.StatsEnabled)
+ {
+ }
+
+ virtual ~DefaultUpstreamApply() { Shutdown(); }
+
+ virtual bool Initialize() override
+ {
+ for (auto& Endpoint : m_Endpoints)
+ {
+ const UpstreamEndpointHealth Health = Endpoint->Initialize();
+ if (Health.Ok)
+ {
+ Log().info("initialize endpoint '{}' OK", Endpoint->DisplayName());
+ }
+ else
+ {
+ Log().warn("initialize endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason);
+ }
+ }
+
+ m_RunState.IsRunning = !m_Endpoints.empty();
+
+ if (m_RunState.IsRunning)
+ {
+ for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++)
+ {
+ m_UpstreamThreads.emplace_back(&DefaultUpstreamApply::ProcessUpstreamQueue, this);
+ }
+
+ m_UpstreamUpdatesThread = std::thread(&DefaultUpstreamApply::ProcessUpstreamUpdates, this);
+
+ m_EndpointMonitorThread = std::thread(&DefaultUpstreamApply::MonitorEndpoints, this);
+ }
+
+ return m_RunState.IsRunning;
+ }
+
+ virtual void RegisterEndpoint(std::unique_ptr<UpstreamApplyEndpoint> Endpoint) override
+ {
+ m_Endpoints.emplace_back(std::move(Endpoint));
+ }
+
+ virtual EnqueueResult EnqueueUpstream(UpstreamApplyRecord ApplyRecord) override
+ {
+ if (m_RunState.IsRunning)
+ {
+ const IoHash WorkerId = ApplyRecord.WorkerDescriptor.GetHash();
+ const IoHash ActionId = ApplyRecord.Action.GetHash();
+ const uint32_t TimeoutSeconds = ApplyRecord.WorkerDescriptor["timeout"sv].AsInt32(300);
+
+ {
+ std::scoped_lock Lock(m_ApplyTasksMutex);
+ if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr)
+ {
+ // Already in progress
+ return {.ApplyId = ActionId, .Success = true};
+ }
+
+ std::chrono::steady_clock::time_point ExpireTime =
+ TimeoutSeconds > 0 ? std::chrono::steady_clock::now() + std::chrono::seconds(TimeoutSeconds)
+ : std::chrono::steady_clock::time_point::max();
+
+ m_ApplyTasks[WorkerId][ActionId] = {.State = UpstreamApplyState::Queued, .Result{}, .ExpireTime = std::move(ExpireTime)};
+ }
+
+ if (!m_UpstreamThreads.empty())
+ {
+ m_UpstreamQueue.Enqueue(std::move(ApplyRecord));
+ }
+ else
+ {
+ ProcessApplyRecord(std::move(ApplyRecord));
+ }
+
+ return {.ApplyId = ActionId, .Success = true};
+ }
+
+ return {};
+ }
+
+ virtual StatusResult GetStatus(const IoHash& WorkerId, const IoHash& ActionId) override
+ {
+ if (m_RunState.IsRunning)
+ {
+ std::scoped_lock Lock(m_ApplyTasksMutex);
+ if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr)
+ {
+ return {.Status = *Status, .Success = true};
+ }
+ }
+
+ return {};
+ }
+
+ virtual void GetStatus(CbObjectWriter& Status) override
+ {
+ Status << "worker_threads" << m_Options.ThreadCount;
+ Status << "queue_count" << m_UpstreamQueue.Size();
+
+ Status.BeginArray("endpoints");
+ for (const auto& Ep : m_Endpoints)
+ {
+ Status.BeginObject();
+ Status << "name" << Ep->DisplayName();
+ Status << "health" << (Ep->IsHealthy() ? "ok"sv : "inactive"sv);
+
+ UpstreamApplyEndpointStats& Stats = Ep->Stats();
+ const uint64_t PostCount = Stats.PostCount;
+ const uint64_t CompleteCount = Stats.CompleteCount;
+ const uint64_t UpdateCount = Stats.UpdateCount;
+ const double CompleteRate = CompleteCount > 0 ? (double(PostCount) / double(CompleteCount)) : 0.0;
+
+ Status << "post_count" << PostCount;
+ Status << "complete_count" << PostCount;
+ Status << "update_count" << Stats.UpdateCount;
+
+ Status << "complete_ratio" << CompleteRate;
+ Status << "downloaded_mb" << Stats.DownBytes;
+ Status << "uploaded_mb" << Stats.UpBytes;
+ Status << "error_count" << Stats.ErrorCount;
+
+ Status.EndObject();
+ }
+ Status.EndArray();
+ }
+
+private:
+ // The caller is responsible for locking if required
+ UpstreamApplyStatus* FindStatus(const IoHash& WorkerId, const IoHash& ActionId)
+ {
+ if (auto It = m_ApplyTasks.find(WorkerId); It != m_ApplyTasks.end())
+ {
+ if (auto It2 = It->second.find(ActionId); It2 != It->second.end())
+ {
+ return &It2->second;
+ }
+ }
+ return nullptr;
+ }
+
+ void ProcessApplyRecord(UpstreamApplyRecord ApplyRecord)
+ {
+ const IoHash WorkerId = ApplyRecord.WorkerDescriptor.GetHash();
+ const IoHash ActionId = ApplyRecord.Action.GetHash();
+ try
+ {
+ for (auto& Endpoint : m_Endpoints)
+ {
+ if (Endpoint->IsHealthy())
+ {
+ PostUpstreamApplyResult Result = Endpoint->PostApply(std::move(ApplyRecord));
+ {
+ std::scoped_lock Lock(m_ApplyTasksMutex);
+ if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr)
+ {
+ if (Result.Success)
+ {
+ Status->State = UpstreamApplyState::Executing;
+ }
+ else
+ {
+ Status->State = UpstreamApplyState::Complete;
+ Status->Result = {.Error = std::move(Result.Error),
+ .Bytes = Result.Bytes,
+ .ElapsedSeconds = Result.ElapsedSeconds};
+ }
+ }
+ }
+ m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints);
+ return;
+ }
+ }
+
+ {
+ std::scoped_lock Lock(m_ApplyTasksMutex);
+ if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr)
+ {
+ Status->State = UpstreamApplyState::Complete;
+ Status->Result = {.Error{.ErrorCode = -1, .Reason = "No available endpoint"}};
+ }
+ Log().warn("process upstream apply ({}/{}) FAILED 'No available endpoint'", WorkerId, ActionId);
+ }
+ }
+ catch (std::exception& e)
+ {
+ std::scoped_lock Lock(m_ApplyTasksMutex);
+ if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr)
+ {
+ Status->State = UpstreamApplyState::Complete;
+ Status->Result = {.Error{.ErrorCode = -1, .Reason = e.what()}};
+ }
+ Log().warn("process upstream apply ({}/{}) FAILED '{}'", WorkerId, ActionId, e.what());
+ }
+ }
+
+ void ProcessUpstreamQueue()
+ {
+ for (;;)
+ {
+ UpstreamApplyRecord ApplyRecord;
+ if (m_UpstreamQueue.WaitAndDequeue(ApplyRecord))
+ {
+ ProcessApplyRecord(std::move(ApplyRecord));
+ }
+
+ if (!m_RunState.IsRunning)
+ {
+ break;
+ }
+ }
+ }
+
+ void ProcessApplyUpdates()
+ {
+ for (auto& Endpoint : m_Endpoints)
+ {
+ if (Endpoint->IsHealthy())
+ {
+ GetUpstreamApplyUpdatesResult Result = Endpoint->GetUpdates();
+ m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints);
+
+ if (!Result.Success)
+ {
+ Log().warn("process upstream apply updates FAILED '{}'", Result.Error.Reason);
+ }
+
+ if (!Result.Completed.empty())
+ {
+ for (auto& It : Result.Completed)
+ {
+ for (auto& It2 : It.second)
+ {
+ std::scoped_lock Lock(m_ApplyTasksMutex);
+ if (auto Status = FindStatus(It.first, It2.first); Status != nullptr)
+ {
+ Status->State = UpstreamApplyState::Complete;
+ Status->Result = std::move(It2.second);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ void ProcessUpstreamUpdates()
+ {
+ const auto& UpdateSleep = std::chrono::seconds(m_Options.UpdatesInterval);
+ for (;;)
+ {
+ std::this_thread::sleep_for(UpdateSleep);
+
+ if (!m_RunState.IsRunning)
+ {
+ break;
+ }
+
+ ProcessApplyUpdates();
+
+ // Remove any expired tasks, regardless of state
+ {
+ std::scoped_lock Lock(m_ApplyTasksMutex);
+ for (auto& WorkerIt : m_ApplyTasks)
+ {
+ const auto Count = std::erase_if(WorkerIt.second, [](const auto& Item) {
+ return Item.second.ExpireTime < std::chrono::steady_clock::now();
+ });
+ if (Count > 0)
+ {
+ Log().debug("Removed '{}' expired tasks", Count);
+ }
+ }
+ const auto Count = std::erase_if(m_ApplyTasks, [](const auto& Item) { return Item.second.empty(); });
+ if (Count > 0)
+ {
+ Log().debug("Removed '{}' empty task lists", Count);
+ }
+ }
+ }
+ }
+
+ void MonitorEndpoints()
+ {
+ for (;;)
+ {
+ {
+ std::unique_lock Lock(m_RunState.Mutex);
+ if (m_RunState.ExitSignal.wait_for(Lock, m_Options.HealthCheckInterval, [this]() { return !m_RunState.IsRunning.load(); }))
+ {
+ break;
+ }
+ }
+
+ for (auto& Endpoint : m_Endpoints)
+ {
+ if (!Endpoint->IsHealthy())
+ {
+ if (const UpstreamEndpointHealth Health = Endpoint->CheckHealth(); Health.Ok)
+ {
+ Log().warn("health check endpoint '{}' OK", Endpoint->DisplayName(), Health.Reason);
+ }
+ else
+ {
+ Log().warn("health check endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason);
+ }
+ }
+ }
+ }
+ }
+
+ void Shutdown()
+ {
+ if (m_RunState.Stop())
+ {
+ m_UpstreamQueue.CompleteAdding();
+ for (std::thread& Thread : m_UpstreamThreads)
+ {
+ Thread.join();
+ }
+
+ m_EndpointMonitorThread.join();
+ m_UpstreamUpdatesThread.join();
+ m_UpstreamThreads.clear();
+ m_Endpoints.clear();
+ }
+ }
+
+ spdlog::logger& Log() { return m_Log; }
+
+ using UpstreamApplyQueue = BlockingQueue<UpstreamApplyRecord>;
+
+ struct RunState
+ {
+ std::mutex Mutex;
+ std::condition_variable ExitSignal;
+ std::atomic_bool IsRunning{false};
+
+ bool Stop()
+ {
+ bool Stopped = false;
+ {
+ std::scoped_lock Lock(Mutex);
+ Stopped = IsRunning.exchange(false);
+ }
+ if (Stopped)
+ {
+ ExitSignal.notify_all();
+ }
+ return Stopped;
+ }
+ };
+
+ spdlog::logger& m_Log;
+ UpstreamApplyOptions m_Options;
+ CasStore& m_CasStore;
+ CidStore& m_CidStore;
+ UpstreamApplyQueue m_UpstreamQueue;
+ UpstreamApplyStats m_Stats;
+ UpstreamApplyTasks m_ApplyTasks;
+ std::mutex m_ApplyTasksMutex;
+ std::vector<std::unique_ptr<UpstreamApplyEndpoint>> m_Endpoints;
+ std::vector<std::thread> m_UpstreamThreads;
+ std::thread m_UpstreamUpdatesThread;
+ std::thread m_EndpointMonitorThread;
+ RunState m_RunState;
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+std::unique_ptr<UpstreamApply>
+MakeUpstreamApply(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore)
+{
+ return std::make_unique<DefaultUpstreamApply>(Options, CasStore, CidStore);
+}
+
+std::unique_ptr<UpstreamApplyEndpoint>
+MakeHordeUpstreamEndpoint(const CloudCacheClientOptions& Options, CasStore& CasStore, CidStore& CidStore)
+{
+ return std::make_unique<detail::HordeUpstreamApplyEndpoint>(Options, CasStore, CidStore);
+}
+
+} // namespace zen
diff --git a/zenserver/upstream/upstreamapply.h b/zenserver/upstream/upstreamapply.h
new file mode 100644
index 000000000..8f72660c7
--- /dev/null
+++ b/zenserver/upstream/upstreamapply.h
@@ -0,0 +1,172 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/compactbinarypackage.h>
+#include <zencore/iobuffer.h>
+#include <zencore/iohash.h>
+#include <zencore/zencore.h>
+
+#include <atomic>
+#include <chrono>
+#include <memory>
+#include <unordered_map>
+#include <unordered_set>
+
+namespace zen {
+
+class CbObjectWriter;
+class CasStore;
+class CidStore;
+class ZenCacheStore;
+struct CloudCacheClientOptions;
+
+enum class UpstreamApplyState : int32_t
+{
+ Queued = 0,
+ Executing = 1,
+ Complete = 2,
+};
+
+struct UpstreamApplyRecord
+{
+ CbObject WorkerDescriptor;
+ CbObject Action;
+};
+
+struct UpstreamApplyOptions
+{
+ std::chrono::seconds HealthCheckInterval{5};
+ std::chrono::seconds UpdatesInterval{5};
+ uint32_t ThreadCount = 4;
+ bool StatsEnabled = false;
+};
+
+struct UpstreamApplyError
+{
+ int32_t ErrorCode{};
+ std::string Reason{};
+
+ explicit operator bool() const { return ErrorCode != 0; }
+};
+
+struct PostUpstreamApplyResult
+{
+ UpstreamApplyError Error{};
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+ bool Success = false;
+};
+
+struct GetUpstreamApplyResult
+{
+ CbPackage OutputPackage{};
+ int64_t TotalAttachmentBytes{};
+ int64_t TotalRawAttachmentBytes{};
+ UpstreamApplyError Error{};
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+ std::string StdOut{};
+ std::string StdErr{};
+ bool Success = false;
+};
+
+using UpstreamApplyCompleted = std::unordered_map<IoHash, std::unordered_map<IoHash, GetUpstreamApplyResult>>;
+
+struct GetUpstreamApplyUpdatesResult
+{
+ UpstreamApplyError Error{};
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+ UpstreamApplyCompleted Completed{};
+ bool Success = false;
+};
+
+struct UpstreamApplyStatus
+{
+ UpstreamApplyState State{};
+ GetUpstreamApplyResult Result{};
+ std::chrono::steady_clock::time_point ExpireTime{};
+};
+
+using UpstreamApplyTasks = std::unordered_map<IoHash, std::unordered_map<IoHash, UpstreamApplyStatus>>;
+
+struct UpstreamEndpointHealth
+{
+ std::string Reason;
+ bool Ok = false;
+};
+
+struct UpstreamApplyEndpointStats
+{
+ std::atomic_uint64_t PostCount{};
+ std::atomic_uint64_t CompleteCount{};
+ std::atomic_uint64_t UpdateCount{};
+ std::atomic_uint64_t ErrorCount{};
+ std::atomic<double> UpBytes{};
+ std::atomic<double> DownBytes{};
+ std::atomic<double> SecondsUp{};
+ std::atomic<double> SecondsDown{};
+};
+
+/**
+ * The upstream apply endpont is responsible for handling remote execution.
+ */
+class UpstreamApplyEndpoint
+{
+public:
+ virtual ~UpstreamApplyEndpoint() = default;
+
+ virtual UpstreamEndpointHealth Initialize() = 0;
+
+ virtual bool IsHealthy() const = 0;
+
+ virtual UpstreamEndpointHealth CheckHealth() = 0;
+
+ virtual std::string_view DisplayName() const = 0;
+
+ virtual PostUpstreamApplyResult PostApply(const UpstreamApplyRecord& ApplyRecord) = 0;
+
+ virtual GetUpstreamApplyUpdatesResult GetUpdates() = 0;
+
+ virtual UpstreamApplyEndpointStats& Stats() = 0;
+};
+
+/**
+ * Manages one or more upstream cache endpoints.
+ */
+class UpstreamApply
+{
+public:
+ virtual ~UpstreamApply() = default;
+
+ virtual bool Initialize() = 0;
+
+ virtual void RegisterEndpoint(std::unique_ptr<UpstreamApplyEndpoint> Endpoint) = 0;
+
+ struct EnqueueResult
+ {
+ IoHash ApplyId{};
+ bool Success = false;
+ };
+
+ struct StatusResult
+ {
+ UpstreamApplyStatus Status{};
+ bool Success = false;
+ };
+
+ virtual EnqueueResult EnqueueUpstream(UpstreamApplyRecord ApplyRecord) = 0;
+
+ virtual StatusResult GetStatus(const IoHash& WorkerId, const IoHash& ActionId) = 0;
+
+ virtual void GetStatus(CbObjectWriter& CbO) = 0;
+};
+
+std::unique_ptr<UpstreamApply> MakeUpstreamApply(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore);
+
+std::unique_ptr<UpstreamApplyEndpoint> MakeHordeUpstreamEndpoint(const CloudCacheClientOptions& Options,
+ CasStore& CasStore,
+ CidStore& CidStore);
+
+} // namespace zen
diff --git a/zenserver/zenserver.vcxproj b/zenserver/zenserver.vcxproj
index d954d3f8d..480d5dd15 100644
--- a/zenserver/zenserver.vcxproj
+++ b/zenserver/zenserver.vcxproj
@@ -125,6 +125,7 @@
<ClInclude Include="diag\diagsvcs.h" />
<ClInclude Include="experimental\usnjournal.h" />
<ClInclude Include="targetver.h" />
+ <ClInclude Include="upstream\upstreamapply.h" />
<ClInclude Include="upstream\upstreamcache.h" />
<ClInclude Include="upstream\zen.h" />
<ClInclude Include="windows\service.h" />
@@ -147,6 +148,7 @@
<ClCompile Include="testing\launch.cpp" />
<ClCompile Include="casstore.cpp" />
<ClCompile Include="experimental\usnjournal.cpp" />
+ <ClCompile Include="upstream\upstreamapply.cpp" />
<ClCompile Include="upstream\upstreamcache.cpp" />
<ClCompile Include="upstream\zen.cpp" />
<ClCompile Include="windows\service.cpp" />
diff --git a/zenserver/zenserver.vcxproj.filters b/zenserver/zenserver.vcxproj.filters
index 04c6267ba..6de9230d3 100644
--- a/zenserver/zenserver.vcxproj.filters
+++ b/zenserver/zenserver.vcxproj.filters
@@ -41,6 +41,9 @@
<ClInclude Include="experimental\vfs.h" />
<ClInclude Include="monitoring\httpstats.h" />
<ClInclude Include="monitoring\httpstatus.h" />
+ <ClInclude Include="upstream\upstreamapply.h">
+ <Filter>upstream</Filter>
+ </ClInclude>
</ItemGroup>
<ItemGroup>
<ClCompile Include="zenserver.cpp" />
@@ -76,6 +79,9 @@
<ClCompile Include="experimental\vfs.cpp" />
<ClCompile Include="monitoring\httpstats.cpp" />
<ClCompile Include="monitoring\httpstatus.cpp" />
+ <ClCompile Include="upstream\upstreamapply.cpp">
+ <Filter>upstream</Filter>
+ </ClCompile>
</ItemGroup>
<ItemGroup>
<Filter Include="cache">