aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-04-26 10:46:38 +0200
committerDan Engelbrecht <[email protected]>2022-04-26 10:46:38 +0200
commitbd5b1a15f65f2fde3721acd16ce3a316c03583ea (patch)
treedfa0acea7237697cbdf14a1fa2bc121a060f6ec7
parentMerge branch 'main' into de/cache-with-block-store (diff)
parentCompute tweaks (#78) (diff)
downloadzen-bd5b1a15f65f2fde3721acd16ce3a316c03583ea.tar.xz
zen-bd5b1a15f65f2fde3721acd16ce3a316c03583ea.zip
Merge remote-tracking branch 'origin/main' into de/cache-with-block-store
-rw-r--r--.github/workflows/self_host_build.yml (renamed from .github/workflows/on_prem_windows.yml)8
-rw-r--r--.github/workflows/update_release.yml124
-rw-r--r--zencore/compactbinary.cpp19
-rw-r--r--zencore/filesystem.cpp17
-rw-r--r--zencore/include/zencore/compactbinary.h1
-rw-r--r--zenserver/compute/function.cpp19
-rw-r--r--zenserver/config.cpp65
-rw-r--r--zenserver/config.h8
-rw-r--r--zenserver/upstream/hordecompute.cpp1374
-rw-r--r--zenserver/upstream/jupiter.cpp46
-rw-r--r--zenserver/upstream/jupiter.h7
-rw-r--r--zenserver/upstream/upstreamapply.cpp1321
-rw-r--r--zenserver/upstream/upstreamapply.h45
-rw-r--r--zenserver/zenserver.cpp19
-rw-r--r--zenstore-test/zenstore-test.cpp2
15 files changed, 1725 insertions, 1350 deletions
diff --git a/.github/workflows/on_prem_windows.yml b/.github/workflows/self_host_build.yml
index 04036c38c..2645f9738 100644
--- a/.github/workflows/on_prem_windows.yml
+++ b/.github/workflows/self_host_build.yml
@@ -1,10 +1,8 @@
-name: Validate
+name: Validate Build
on:
- push:
- branches:
- - main
pull_request:
+ types: [opened, reopened]
branches: [ main ]
jobs:
@@ -34,6 +32,7 @@ jobs:
windows-build:
name: Build Windows
+ needs: clang-format
runs-on: [self-hosted, windows, x64]
strategy:
matrix:
@@ -82,6 +81,7 @@ jobs:
linux-build:
name: Build Linux
+ needs: clang-format
runs-on: [self-hosted, linux, x64]
strategy:
matrix:
diff --git a/.github/workflows/update_release.yml b/.github/workflows/update_release.yml
new file mode 100644
index 000000000..62568d1c0
--- /dev/null
+++ b/.github/workflows/update_release.yml
@@ -0,0 +1,124 @@
+name: Build release
+
+on:
+ # push
+ pull_request:
+ types: [closed]
+ branches: [ main ]
+
+jobs:
+ windows-build:
+ # if: github.event.pull_request.merged == true
+ name: Build Windows
+ runs-on: [self-hosted, windows, x64]
+ strategy:
+ matrix:
+ config:
+ - 'release'
+ arch:
+ - 'x64'
+ env:
+ VCPKG_VERSION: 2022.03.10
+
+ steps:
+ - uses: actions/checkout@v2
+
+ - name: Setup xmake
+ uses: xmake-io/github-action-setup-xmake@v1
+ with:
+ xmake-version: 2.6.4
+
+ - name: Installing vcpkg
+ run: |
+ git clone -b ${{env.VCPKG_VERSION}} --single-branch https://github.com/Microsoft/vcpkg.git .vcpkg
+ cd .vcpkg
+ .\bootstrap-vcpkg.bat
+ .\vcpkg.exe integrate install
+ cd ..
+
+ - name: Cache vcpkg
+ uses: actions/cache@v2
+ with:
+ path: |
+ ${{ github.workspace }}\.vcpkg\installed
+ key: ${{ runner.os }}-${{ matrix.config }}-${{env.VCPKG_VERSION}}-${{ hashFiles('xmake.lua') }}-${{ matrix.arch }}-v5
+
+ - name: Config
+ run: |
+ xmake config -v -y -m ${{ matrix.config }} --arch=${{ matrix.arch }}
+ env:
+ VCPKG_ROOT: ${{ github.workspace }}/.vcpkg
+
+ - name: Build
+ run: |
+ xmake build -v -y
+ env:
+ VCPKG_ROOT: ${{ github.workspace }}/.vcpkg
+
+ # - name: Create Archive
+ # run: |
+ # cd .\build\windows\${{ matrix.arch }}\${{ matrix.config }}
+ # C:\'Program Files'\7-Zip\7z.exe a -r ..\..\..\..\windows-${{ matrix.arch }}-${{ matrix.config }}.zip *
+ # cd ..\..\..\..
+
+ - name: Create Archive
+ run: |
+ cd .\build\windows\${{ matrix.arch }}\${{ matrix.config }}
+ C:\'Program Files'\7-Zip\7z.exe a -r ..\..\..\..\zenserver-win64.zip zenserver.exe
+ cd ..\..\..\..
+
+ - name: Get current release version info
+ run: |
+ $repo = "EpicGames/zen"
+ $releases = "https://api.github.com/repos/$repo/releases/latest"
+ Write-Host Determining latest release
+ $latest = (Invoke-WebRequest -Headers @{"Accept"="application/vnd.github.v3+json";"Authorization"="token ${{ secrets.GITHUB_TOKEN }}"} $releases | ConvertFrom-Json)[0]
+ $current_version_tag = [version]$latest.tag_name.replace('v','')
+ echo "Current version" $current_version_tag
+ $new_version_tag = [version]::New($current_version_tag.Major,$current_version_tag.Minor,$current_version_tag.Build,$current_version_tag.Revision+1).toString()
+ echo $new_version_tag
+ echo "new_version_tag=$new_version_tag" | Out-File -FilePath $env:GITHUB_ENV -Encoding utf8 -Append
+
+ - name: Create Release
+ id: create_release
+ uses: actions/create-release@v1
+ env:
+ GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+ with:
+ tag_name: v${{ env.new_version_tag }}
+ release_name: Release
+ draft: false
+ prerelease: false
+
+ # - name: Create Release
+ # id: create_release
+ # uses: actions/create-release@v1
+ # env:
+ # GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+ # with:
+ # tag_name: ${{ github.ref_name }}
+ # release_name: Release ${{ github.head_ref }}
+ # draft: false
+ # prerelease: false
+
+ # - name: Upload Release Asset
+ # id: upload-release-asset
+ # uses: actions/upload-release-asset@v1
+ # env:
+ # GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+ # with:
+ # upload_url: ${{ steps.create_release.outputs.upload_url }} # This pulls from the CREATE RELEASE step above, referencing it's ID to get its outputs object, which include a `upload_url`. See this blog post for more info: https://jasonet.co/posts/new-features-of-github-actions/#passing-data-to-future-steps
+ # asset_path: .\windows-${{ matrix.arch }}-${{ matrix.config }}.zip
+ # asset_name: windows-${{ matrix.arch }}-${{ matrix.config }}
+ # asset_content_type: application/zip
+ - name: Upload Release Asset
+ id: upload-release-asset
+ uses: actions/upload-release-asset@v1
+ env:
+ GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+ with:
+ upload_url: ${{ steps.create_release.outputs.upload_url }}
+ asset_path: .\zenserver-win64.zip
+ asset_name: zenserver-win64.zip
+ asset_content_type: application/zip
+
diff --git a/zencore/compactbinary.cpp b/zencore/compactbinary.cpp
index 31d449c41..ffc1da10c 100644
--- a/zencore/compactbinary.cpp
+++ b/zencore/compactbinary.cpp
@@ -60,22 +60,27 @@ GetPlatformToDateTimeBiasInSeconds()
return uint64_t(double(PlatformEpochYear - DateTimeEpochYear) * 365.2425) * 86400;
}
-DateTime
-DateTime::Now()
+uint64_t
+DateTime::NowTicks()
{
- static const uint64_t EpochBias = GetPlatformToDateTimeBiasInSeconds();
- static const uint64_t SecsTo100nsTicks = int64_t(10e9 / 100);
+ static constexpr uint64_t EpochBias = GetPlatformToDateTimeBiasInSeconds();
#if ZEN_PLATFORM_WINDOWS
FILETIME SysTime;
- GetSystemTimeAsFileTime(&SysTime);
- return DateTime{(EpochBias * SecsTo100nsTicks) + (uint64_t(SysTime.dwHighDateTime) << 32) | SysTime.dwLowDateTime};
+ GetSystemTimePreciseAsFileTime(&SysTime);
+ return (EpochBias * TimeSpan::TicksPerSecond) + ((uint64_t(SysTime.dwHighDateTime) << 32) | SysTime.dwLowDateTime);
#else
int64_t SecondsSinceUnixEpoch = time(nullptr);
- return DateTime{(EpochBias + SecondsSinceUnixEpoch) * SecsTo100nsTicks};
+ return (EpochBias + SecondsSinceUnixEpoch) * TimeSpan::TicksPerSecond;
#endif
}
+DateTime
+DateTime::Now()
+{
+ return DateTime{NowTicks()};
+}
+
void
DateTime::Set(int Year, int Month, int Day, int Hour, int Minute, int Second, int MilliSecond)
{
diff --git a/zencore/filesystem.cpp b/zencore/filesystem.cpp
index 7ff0dc45c..437741161 100644
--- a/zencore/filesystem.cpp
+++ b/zencore/filesystem.cpp
@@ -993,7 +993,7 @@ MaximizeOpenFileCount()
{
#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
struct rlimit Limit;
- int Error = getrlimit(RLIMIT_NOFILE, &Limit);
+ int Error = getrlimit(RLIMIT_NOFILE, &Limit);
if (Error)
{
ZEN_WARN("failed getting rlimit RLIMIT_NOFILE, reason '{}'", zen::MakeErrorCode(Error).message());
@@ -1001,13 +1001,22 @@ MaximizeOpenFileCount()
else
{
struct rlimit NewLimit = Limit;
- NewLimit.rlim_cur = NewLimit.rlim_max;
- ZEN_INFO("changing RLIMIT_NOFILE from rlim_cur = {}, rlim_max {} to rlim_cur = {}, rlim_max {}", Limit.rlim_cur, Limit.rlim_max, NewLimit.rlim_cur, NewLimit.rlim_max);
+ NewLimit.rlim_cur = NewLimit.rlim_max;
+ ZEN_INFO("changing RLIMIT_NOFILE from rlim_cur = {}, rlim_max {} to rlim_cur = {}, rlim_max {}",
+ Limit.rlim_cur,
+ Limit.rlim_max,
+ NewLimit.rlim_cur,
+ NewLimit.rlim_max);
Error = setrlimit(RLIMIT_NOFILE, &NewLimit);
if (Error != 0)
{
- ZEN_WARN("failed to set RLIMIT_NOFILE limits from rlim_cur = {}, rlim_max {} to rlim_cur = {}, rlim_max {}, reason '{}'", Limit.rlim_cur, Limit.rlim_max, NewLimit.rlim_cur, NewLimit.rlim_max, zen::MakeErrorCode(Error).message());
+ ZEN_WARN("failed to set RLIMIT_NOFILE limits from rlim_cur = {}, rlim_max {} to rlim_cur = {}, rlim_max {}, reason '{}'",
+ Limit.rlim_cur,
+ Limit.rlim_max,
+ NewLimit.rlim_cur,
+ NewLimit.rlim_max,
+ zen::MakeErrorCode(Error).message());
}
}
#endif
diff --git a/zencore/include/zencore/compactbinary.h b/zencore/include/zencore/compactbinary.h
index 25fd4a7b2..19f1597dc 100644
--- a/zencore/include/zencore/compactbinary.h
+++ b/zencore/include/zencore/compactbinary.h
@@ -43,6 +43,7 @@ public:
inline uint64_t GetTicks() const { return Ticks; }
+ static uint64_t NowTicks();
static DateTime Now();
int GetYear() const;
diff --git a/zenserver/compute/function.cpp b/zenserver/compute/function.cpp
index 7c5f33e78..dd31013ef 100644
--- a/zenserver/compute/function.cpp
+++ b/zenserver/compute/function.cpp
@@ -54,6 +54,16 @@ HttpFunctionService::HttpFunctionService(CasStore& Store,
m_Router.AddPattern("action", "([[:xdigit:]]{40})");
m_Router.RegisterRoute(
+ "ready",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ // Todo: check upstream health
+ return HttpReq.WriteResponse(HttpResponseCode::OK);
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
"workers/{worker}",
[this](HttpRouterRequest& Req) {
HttpServerRequest& HttpReq = Req.ServerRequest();
@@ -532,6 +542,15 @@ HttpFunctionService::ExecActionUpstreamResult(const IoHash& WorkerId, CbObject&
ResultObject.AddString("stdout"sv, Completed.StdOut);
ResultObject.AddString("stderr"sv, Completed.StdErr);
ResultObject.AddInteger("exitcode"sv, Completed.Error.ErrorCode);
+ ResultObject.BeginArray("stats"sv);
+ for (const auto& Timepoint : Completed.Timepoints)
+ {
+ ResultObject.BeginObject();
+ ResultObject.AddString("name"sv, Timepoint.first);
+ ResultObject.AddDateTimeTicks("time"sv, Timepoint.second);
+ ResultObject.EndObject();
+ }
+ ResultObject.EndArray();
ResultObject.BeginArray("files"sv);
for (const auto& File : Completed.OutputFiles)
diff --git a/zenserver/config.cpp b/zenserver/config.cpp
index ac0f863cc..be91ae4f8 100644
--- a/zenserver/config.cpp
+++ b/zenserver/config.cpp
@@ -389,6 +389,49 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions)
options.add_option("compute",
"",
+ "upstream-horde-storage-url",
+ "URL to a Horde Storage instance.",
+ cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.StorageUrl)->default_value(""),
+ "");
+
+ options.add_option("compute",
+ "",
+ "upstream-horde-storage-oauth-url",
+ "URL to the OAuth provier",
+ cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.StorageOAuthUrl)->default_value(""),
+ "");
+
+ options.add_option("compute",
+ "",
+ "upstream-horde-storage-oauth-clientid",
+ "The OAuth client ID",
+ cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.StorageOAuthClientId)->default_value(""),
+ "");
+
+ options.add_option(
+ "compute",
+ "",
+ "upstream-horde-storage-oauth-clientsecret",
+ "The OAuth client secret",
+ cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.StorageOAuthClientSecret)->default_value(""),
+ "");
+
+ options.add_option("compute",
+ "",
+ "upstream-horde-storage-openid-provider",
+ "Name of a registered Open ID provider",
+ cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.StorageOpenIdProvider)->default_value(""),
+ "");
+
+ options.add_option("compute",
+ "",
+ "upstream-horde-storage-token",
+ "A static authentication token",
+ cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.StorageAccessToken)->default_value(""),
+ "");
+
+ options.add_option("compute",
+ "",
"upstream-horde-cluster",
"The Horde compute cluster id",
cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.Cluster)->default_value(""),
@@ -700,6 +743,28 @@ ParseConfigFile(const std::filesystem::path& Path, ZenServerOptions& ServerOptio
std::string_view("namespace"),
ServerOptions.UpstreamCacheConfig.HordeConfig.Namespace);
};
+
+ if (auto StorageConfig = UpstreamConfig->get<sol::optional<sol::table>>("storage"))
+ {
+ UpdateStringValueFromConfig(StorageConfig.value(),
+ std::string_view("url"),
+ ServerOptions.UpstreamCacheConfig.HordeConfig.StorageUrl);
+ UpdateStringValueFromConfig(StorageConfig.value(),
+ std::string_view("oauthprovider"),
+ ServerOptions.UpstreamCacheConfig.HordeConfig.StorageOAuthUrl);
+ UpdateStringValueFromConfig(StorageConfig.value(),
+ std::string_view("oauthclientid"),
+ ServerOptions.UpstreamCacheConfig.HordeConfig.StorageOAuthClientId);
+ UpdateStringValueFromConfig(StorageConfig.value(),
+ std::string_view("oauthclientsecret"),
+ ServerOptions.UpstreamCacheConfig.HordeConfig.StorageOAuthClientSecret);
+ UpdateStringValueFromConfig(StorageConfig.value(),
+ std::string_view("openidprovider"),
+ ServerOptions.UpstreamCacheConfig.HordeConfig.StorageOpenIdProvider);
+ UpdateStringValueFromConfig(StorageConfig.value(),
+ std::string_view("token"),
+ ServerOptions.UpstreamCacheConfig.HordeConfig.StorageAccessToken);
+ };
}
}
diff --git a/zenserver/config.h b/zenserver/config.h
index 9f1b3645c..49f039d8d 100644
--- a/zenserver/config.h
+++ b/zenserver/config.h
@@ -38,6 +38,14 @@ struct ZenUpstreamHordeConfig
std::string OAuthClientSecret;
std::string OpenIdProvider;
std::string AccessToken;
+
+ std::string StorageUrl;
+ std::string StorageOAuthUrl;
+ std::string StorageOAuthClientId;
+ std::string StorageOAuthClientSecret;
+ std::string StorageOpenIdProvider;
+ std::string StorageAccessToken;
+
std::string Cluster;
std::string Namespace;
};
diff --git a/zenserver/upstream/hordecompute.cpp b/zenserver/upstream/hordecompute.cpp
new file mode 100644
index 000000000..dbf86cc13
--- /dev/null
+++ b/zenserver/upstream/hordecompute.cpp
@@ -0,0 +1,1374 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "upstreamapply.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include "jupiter.h"
+
+# include <zencore/compactbinary.h>
+# include <zencore/compactbinarybuilder.h>
+# include <zencore/compactbinarypackage.h>
+# include <zencore/compactbinaryvalidation.h>
+# include <zencore/fmtutils.h>
+# include <zencore/session.h>
+# include <zencore/stream.h>
+# include <zencore/thread.h>
+# include <zencore/timer.h>
+# include <zencore/workthreadpool.h>
+
+# include <zenstore/cas.h>
+# include <zenstore/cidstore.h>
+
+# include <auth/authmgr.h>
+# include <upstream/upstreamcache.h>
+
+# include "cache/structuredcachestore.h"
+# include "diag/logging.h"
+
+# include <fmt/format.h>
+
+# include <algorithm>
+# include <atomic>
+# include <set>
+# include <stack>
+
+namespace zen {
+
+using namespace std::literals;
+
+static const IoBuffer EmptyBuffer;
+static const IoHash EmptyBufferId = IoHash::HashBuffer(EmptyBuffer);
+
+namespace detail {
+
+ class HordeUpstreamApplyEndpoint final : public UpstreamApplyEndpoint
+ {
+ public:
+ HordeUpstreamApplyEndpoint(const CloudCacheClientOptions& ComputeOptions,
+ const UpstreamAuthConfig& ComputeAuthConfig,
+ const CloudCacheClientOptions& StorageOptions,
+ const UpstreamAuthConfig& StorageAuthConfig,
+ CasStore& CasStore,
+ CidStore& CidStore,
+ AuthMgr& Mgr)
+ : m_Log(logging::Get("upstream-apply"))
+ , m_CasStore(CasStore)
+ , m_CidStore(CidStore)
+ , m_AuthMgr(Mgr)
+ {
+ m_DisplayName = fmt::format("{} - '{}'+'{}'", ComputeOptions.Name, ComputeOptions.ServiceUrl, StorageOptions.ServiceUrl);
+ m_ChannelId = fmt::format("zen-{}", zen::GetSessionIdString());
+
+ {
+ std::unique_ptr<CloudCacheTokenProvider> TokenProvider;
+
+ if (ComputeAuthConfig.OAuthUrl.empty() == false)
+ {
+ TokenProvider =
+ CloudCacheTokenProvider::CreateFromOAuthClientCredentials({.Url = ComputeAuthConfig.OAuthUrl,
+ .ClientId = ComputeAuthConfig.OAuthClientId,
+ .ClientSecret = ComputeAuthConfig.OAuthClientSecret});
+ }
+ else if (ComputeAuthConfig.OpenIdProvider.empty() == false)
+ {
+ TokenProvider =
+ CloudCacheTokenProvider::CreateFromCallback([this, ProviderName = std::string(ComputeAuthConfig.OpenIdProvider)]() {
+ AuthMgr::OpenIdAccessToken Token = m_AuthMgr.GetOpenIdAccessToken(ProviderName);
+ return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime};
+ });
+ }
+ else
+ {
+ CloudCacheAccessToken AccessToken{.Value = std::string(ComputeAuthConfig.AccessToken),
+ .ExpireTime = CloudCacheAccessToken::TimePoint::max()};
+ TokenProvider = CloudCacheTokenProvider::CreateFromStaticToken(AccessToken);
+ }
+
+ m_Client = new CloudCacheClient(ComputeOptions, std::move(TokenProvider));
+ }
+
+ {
+ std::unique_ptr<CloudCacheTokenProvider> TokenProvider;
+
+ if (StorageAuthConfig.OAuthUrl.empty() == false)
+ {
+ TokenProvider =
+ CloudCacheTokenProvider::CreateFromOAuthClientCredentials({.Url = StorageAuthConfig.OAuthUrl,
+ .ClientId = StorageAuthConfig.OAuthClientId,
+ .ClientSecret = StorageAuthConfig.OAuthClientSecret});
+ }
+ else if (StorageAuthConfig.OpenIdProvider.empty() == false)
+ {
+ TokenProvider =
+ CloudCacheTokenProvider::CreateFromCallback([this, ProviderName = std::string(StorageAuthConfig.OpenIdProvider)]() {
+ AuthMgr::OpenIdAccessToken Token = m_AuthMgr.GetOpenIdAccessToken(ProviderName);
+ return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime};
+ });
+ }
+ else
+ {
+ CloudCacheAccessToken AccessToken{.Value = std::string(StorageAuthConfig.AccessToken),
+ .ExpireTime = CloudCacheAccessToken::TimePoint::max()};
+ TokenProvider = CloudCacheTokenProvider::CreateFromStaticToken(AccessToken);
+ }
+
+ m_StorageClient = new CloudCacheClient(StorageOptions, std::move(TokenProvider));
+ }
+ }
+
+ virtual ~HordeUpstreamApplyEndpoint() = default;
+
+ virtual UpstreamEndpointHealth Initialize() override { return CheckHealth(); }
+
+ virtual bool IsHealthy() const override { return m_HealthOk.load(); }
+
+ virtual UpstreamEndpointHealth CheckHealth() override
+ {
+ try
+ {
+ CloudCacheSession Session(m_Client);
+ CloudCacheResult Result = Session.Authenticate();
+
+ m_HealthOk = Result.ErrorCode == 0;
+
+ return {.Reason = std::move(Result.Reason), .Ok = Result.Success};
+ }
+ catch (std::exception& Err)
+ {
+ return {.Reason = Err.what(), .Ok = false};
+ }
+ }
+
+ virtual std::string_view DisplayName() const override { return m_DisplayName; }
+
+ virtual PostUpstreamApplyResult PostApply(UpstreamApplyRecord ApplyRecord) override
+ {
+ PostUpstreamApplyResult ApplyResult{};
+ ApplyResult.Timepoints.merge(ApplyRecord.Timepoints);
+
+ try
+ {
+ UpstreamData UpstreamData;
+ if (!ProcessApplyKey(ApplyRecord, UpstreamData))
+ {
+ return {.Error{.ErrorCode = -1, .Reason = "Failed to generate task data"}};
+ }
+
+ {
+ ApplyResult.Timepoints["zen-storage-build-ref"] = DateTime::NowTicks();
+ std::scoped_lock Lock(m_TaskMutex);
+ if (m_PendingTasks.contains(UpstreamData.TaskId))
+ {
+ // Pending task is already queued, return success
+ ApplyResult.Success = true;
+ return ApplyResult;
+ }
+ m_PendingTasks[UpstreamData.TaskId] = std::move(ApplyRecord);
+ }
+
+ CloudCacheSession ComputeSession(m_Client);
+ CloudCacheSession StorageSession(m_StorageClient);
+
+ {
+ CloudCacheResult Result = BatchPutBlobsIfMissing(StorageSession, UpstreamData.Blobs);
+ ApplyResult.Bytes += Result.Bytes;
+ ApplyResult.ElapsedSeconds += Result.ElapsedSeconds;
+ ApplyResult.Timepoints["zen-storage-upload-blobs"] = DateTime::NowTicks();
+ if (!Result.Success)
+ {
+ ApplyResult.Error = {.ErrorCode = Result.ErrorCode,
+ .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to upload blobs"};
+ return ApplyResult;
+ }
+ UpstreamData.Blobs.clear();
+ }
+
+ {
+ CloudCacheResult Result = BatchPutObjectsIfMissing(StorageSession, UpstreamData.Objects);
+ ApplyResult.Bytes += Result.Bytes;
+ ApplyResult.ElapsedSeconds += Result.ElapsedSeconds;
+ ApplyResult.Timepoints["zen-storage-upload-objects"] = DateTime::NowTicks();
+ if (!Result.Success)
+ {
+ ApplyResult.Error = {.ErrorCode = Result.ErrorCode,
+ .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to upload objects"};
+ return ApplyResult;
+ }
+ }
+
+ {
+ PutRefResult RefResult = StorageSession.PutRef("requests"sv,
+ UpstreamData.TaskId,
+ UpstreamData.Objects[UpstreamData.TaskId].GetBuffer().AsIoBuffer(),
+ ZenContentType::kCbObject);
+ Log().debug("Put ref {} Need={} Bytes={} Duration={}s Result={}",
+ UpstreamData.TaskId,
+ RefResult.Needs.size(),
+ RefResult.Bytes,
+ RefResult.ElapsedSeconds,
+ RefResult.Success);
+ ApplyResult.Bytes += RefResult.Bytes;
+ ApplyResult.ElapsedSeconds += RefResult.ElapsedSeconds;
+ ApplyResult.Timepoints["zen-storage-put-ref"] = DateTime::NowTicks();
+
+ if (RefResult.Needs.size() > 0)
+ {
+ Log().error("Failed to add task ref {} due to {} missing blobs", UpstreamData.TaskId, RefResult.Needs.size());
+ for (const auto& Hash : RefResult.Needs)
+ {
+ Log().debug("Task ref {} missing blob {}", UpstreamData.TaskId, Hash);
+ }
+
+ ApplyResult.Error = {.ErrorCode = RefResult.ErrorCode,
+ .Reason = !RefResult.Reason.empty() ? std::move(RefResult.Reason)
+ : "Failed to add task ref due to missing blob"};
+ return ApplyResult;
+ }
+
+ if (!RefResult.Success)
+ {
+ ApplyResult.Error = {.ErrorCode = RefResult.ErrorCode,
+ .Reason = !RefResult.Reason.empty() ? std::move(RefResult.Reason) : "Failed to add task ref"};
+ return ApplyResult;
+ }
+ UpstreamData.Objects.clear();
+ }
+
+ {
+ CbObjectWriter Writer;
+ Writer.AddString("c"sv, m_ChannelId);
+ Writer.AddObjectAttachment("r"sv, UpstreamData.RequirementsId);
+ Writer.BeginArray("t"sv);
+ Writer.AddObjectAttachment(UpstreamData.TaskId);
+ Writer.EndArray();
+ CbObject TasksObject = Writer.Save();
+ IoBuffer TasksData = TasksObject.GetBuffer().AsIoBuffer();
+
+ CloudCacheResult Result = ComputeSession.PostComputeTasks(TasksData);
+ Log().debug("Post compute task {} Bytes={} Duration={}s Result={}",
+ TasksObject.GetHash(),
+ Result.Bytes,
+ Result.ElapsedSeconds,
+ Result.Success);
+ ApplyResult.Bytes += Result.Bytes;
+ ApplyResult.ElapsedSeconds += Result.ElapsedSeconds;
+ ApplyResult.Timepoints["zen-horde-post-task"] = DateTime::NowTicks();
+ if (!Result.Success)
+ {
+ {
+ std::scoped_lock Lock(m_TaskMutex);
+ m_PendingTasks.erase(UpstreamData.TaskId);
+ }
+
+ ApplyResult.Error = {.ErrorCode = Result.ErrorCode,
+ .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to post compute task"};
+ return ApplyResult;
+ }
+ }
+
+ Log().info("Task posted {}", UpstreamData.TaskId);
+ ApplyResult.Success = true;
+ return ApplyResult;
+ }
+ catch (std::exception& Err)
+ {
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = -1, .Reason = Err.what()}};
+ }
+ }
+
+ [[nodiscard]] CloudCacheResult BatchPutBlobsIfMissing(CloudCacheSession& Session, const std::map<IoHash, IoBuffer>& Blobs)
+ {
+ if (Blobs.size() == 0)
+ {
+ return {.Success = true};
+ }
+
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+
+ // Batch check for missing blobs
+ std::set<IoHash> Keys;
+ std::transform(Blobs.begin(), Blobs.end(), std::inserter(Keys, Keys.end()), [](const auto& It) { return It.first; });
+
+ CloudCacheExistsResult ExistsResult = Session.BlobExists(Keys);
+ Log().debug("Queried {} missing blobs Need={} Duration={}s Result={}",
+ Keys.size(),
+ ExistsResult.Needs.size(),
+ ExistsResult.ElapsedSeconds,
+ ExistsResult.Success);
+ ElapsedSeconds += ExistsResult.ElapsedSeconds;
+ if (!ExistsResult.Success)
+ {
+ return {.Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds,
+ .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1,
+ .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to check if blobs exist"};
+ }
+
+ for (const auto& Hash : ExistsResult.Needs)
+ {
+ CloudCacheResult Result = Session.PutBlob(Hash, Blobs.at(Hash));
+ Log().debug("Put blob {} Bytes={} Duration={}s Result={}", Hash, Result.Bytes, Result.ElapsedSeconds, Result.Success);
+ Bytes += Result.Bytes;
+ ElapsedSeconds += Result.ElapsedSeconds;
+ if (!Result.Success)
+ {
+ return {.Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds,
+ .ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1,
+ .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to put blobs"};
+ }
+ }
+
+ return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true};
+ }
+
+ [[nodiscard]] CloudCacheResult BatchPutObjectsIfMissing(CloudCacheSession& Session, const std::map<IoHash, CbObject>& Objects)
+ {
+ if (Objects.size() == 0)
+ {
+ return {.Success = true};
+ }
+
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+
+ // Batch check for missing objects
+ std::set<IoHash> Keys;
+ std::transform(Objects.begin(), Objects.end(), std::inserter(Keys, Keys.end()), [](const auto& It) { return It.first; });
+
+ CloudCacheExistsResult ExistsResult = Session.ObjectExists(Keys);
+ Log().debug("Queried {} missing objects Need={} Duration={}s Result={}",
+ Keys.size(),
+ ExistsResult.Needs.size(),
+ ExistsResult.ElapsedSeconds,
+ ExistsResult.Success);
+ ElapsedSeconds += ExistsResult.ElapsedSeconds;
+ if (!ExistsResult.Success)
+ {
+ return {.Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds,
+ .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1,
+ .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to check if objects exist"};
+ }
+
+ for (const auto& Hash : ExistsResult.Needs)
+ {
+ CloudCacheResult Result = Session.PutObject(Hash, Objects.at(Hash).GetBuffer().AsIoBuffer());
+ Log().debug("Put object {} Bytes={} Duration={}s Result={}", Hash, Result.Bytes, Result.ElapsedSeconds, Result.Success);
+ Bytes += Result.Bytes;
+ ElapsedSeconds += Result.ElapsedSeconds;
+ if (!Result.Success)
+ {
+ return {.Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds,
+ .ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1,
+ .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to put objects"};
+ }
+ }
+
+ return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true};
+ }
+
+ enum class ComputeTaskState : int32_t
+ {
+ Queued = 0,
+ Executing = 1,
+ Complete = 2,
+ };
+
+ enum class ComputeTaskOutcome : int32_t
+ {
+ Success = 0,
+ Failed = 1,
+ Cancelled = 2,
+ NoResult = 3,
+ Exipred = 4,
+ BlobNotFound = 5,
+ Exception = 6,
+ };
+
+ [[nodiscard]] static std::string_view ComputeTaskStateToString(const ComputeTaskState Outcome)
+ {
+ switch (Outcome)
+ {
+ case ComputeTaskState::Queued:
+ return "Queued"sv;
+ case ComputeTaskState::Executing:
+ return "Executing"sv;
+ case ComputeTaskState::Complete:
+ return "Complete"sv;
+ };
+ return "Unknown"sv;
+ }
+
+ [[nodiscard]] static std::string_view ComputeTaskOutcomeToString(const ComputeTaskOutcome Outcome)
+ {
+ switch (Outcome)
+ {
+ case ComputeTaskOutcome::Success:
+ return "Success"sv;
+ case ComputeTaskOutcome::Failed:
+ return "Failed"sv;
+ case ComputeTaskOutcome::Cancelled:
+ return "Cancelled"sv;
+ case ComputeTaskOutcome::NoResult:
+ return "NoResult"sv;
+ case ComputeTaskOutcome::Exipred:
+ return "Exipred"sv;
+ case ComputeTaskOutcome::BlobNotFound:
+ return "BlobNotFound"sv;
+ case ComputeTaskOutcome::Exception:
+ return "Exception"sv;
+ };
+ return "Unknown"sv;
+ }
+
+ virtual GetUpstreamApplyUpdatesResult GetUpdates(WorkerThreadPool& ThreadPool) override
+ {
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+
+ {
+ std::scoped_lock Lock(m_TaskMutex);
+ if (m_PendingTasks.empty())
+ {
+ if (m_CompletedTasks.empty())
+ {
+ // Nothing to do.
+ return {.Success = true};
+ }
+
+ UpstreamApplyCompleted CompletedTasks;
+ std::swap(CompletedTasks, m_CompletedTasks);
+ return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Completed = std::move(CompletedTasks), .Success = true};
+ }
+ }
+
+ try
+ {
+ CloudCacheSession ComputeSession(m_Client);
+
+ CloudCacheResult UpdatesResult = ComputeSession.GetComputeUpdates(m_ChannelId);
+ Log().debug("Get compute updates Bytes={} Duration={}s Result={}",
+ UpdatesResult.Bytes,
+ UpdatesResult.ElapsedSeconds,
+ UpdatesResult.Success);
+ Bytes += UpdatesResult.Bytes;
+ ElapsedSeconds += UpdatesResult.ElapsedSeconds;
+ if (!UpdatesResult.Success)
+ {
+ return {.Error{.ErrorCode = UpdatesResult.ErrorCode, .Reason = std::move(UpdatesResult.Reason)},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds};
+ }
+
+ if (!UpdatesResult.Success)
+ {
+ return {.Error{.ErrorCode = -1, .Reason = "Failed get task updates"}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds};
+ }
+
+ CbObject TaskStatus = LoadCompactBinaryObject(std::move(UpdatesResult.Response));
+
+ for (auto& It : TaskStatus["u"sv])
+ {
+ CbObjectView Status = It.AsObjectView();
+ IoHash TaskId = Status["h"sv].AsHash();
+ const ComputeTaskState State = (ComputeTaskState)Status["s"sv].AsInt32();
+ const ComputeTaskOutcome Outcome = (ComputeTaskOutcome)Status["o"sv].AsInt32();
+
+ Log().info("Task {} State={}", TaskId, ComputeTaskStateToString(State));
+
+ // Only completed tasks need to be processed
+ if (State != ComputeTaskState::Complete)
+ {
+ continue;
+ }
+
+ IoHash WorkerId{};
+ IoHash ActionId{};
+ UpstreamApplyType ApplyType{};
+
+ {
+ std::scoped_lock Lock(m_TaskMutex);
+ auto TaskIt = m_PendingTasks.find(TaskId);
+ if (TaskIt != m_PendingTasks.end())
+ {
+ WorkerId = TaskIt->second.WorkerDescriptor.GetHash();
+ ActionId = TaskIt->second.Action.GetHash();
+ ApplyType = TaskIt->second.Type;
+ m_PendingTasks.erase(TaskIt);
+ }
+ }
+
+ if (WorkerId == IoHash::Zero)
+ {
+ Log().warn("Task {} missing from pending tasks", TaskId);
+ continue;
+ }
+
+ std::map<std::string, uint64_t> Timepoints;
+ ProcessQueueTimings(Status["qs"sv].AsObjectView(), Timepoints);
+ ProcessExecuteTimings(Status["es"sv].AsObjectView(), Timepoints);
+
+ if (Outcome != ComputeTaskOutcome::Success)
+ {
+ const std::string_view Detail = Status["d"sv].AsString();
+ {
+ std::scoped_lock Lock(m_TaskMutex);
+ m_CompletedTasks[WorkerId][ActionId] = {
+ .Error{.ErrorCode = -1, .Reason = fmt::format("Task {} {}", ComputeTaskOutcomeToString(Outcome), Detail)},
+ .Timepoints = std::move(Timepoints)};
+ }
+ continue;
+ }
+
+ Timepoints["zen-complete-queue-added"] = DateTime::NowTicks();
+ ThreadPool.ScheduleWork([this,
+ ApplyType,
+ ResultHash = Status["r"sv].AsHash(),
+ Timepoints = std::move(Timepoints),
+ TaskId = std::move(TaskId),
+ WorkerId = std::move(WorkerId),
+ ActionId = std::move(ActionId)]() mutable {
+ Timepoints["zen-complete-queue-dispatched"] = DateTime::NowTicks();
+ GetUpstreamApplyResult Result = ProcessTaskStatus(ApplyType, ResultHash);
+ Timepoints["zen-complete-queue-complete"] = DateTime::NowTicks();
+ Result.Timepoints.merge(Timepoints);
+
+ Log().debug("Task Processed {} Files={} Attachments={} ExitCode={}",
+ TaskId,
+ Result.OutputFiles.size(),
+ Result.OutputPackage.GetAttachments().size(),
+ Result.Error.ErrorCode);
+ {
+ std::scoped_lock Lock(m_TaskMutex);
+ m_CompletedTasks[WorkerId][ActionId] = std::move(Result);
+ }
+ });
+ }
+
+ {
+ std::scoped_lock Lock(m_TaskMutex);
+ if (m_CompletedTasks.empty())
+ {
+ // Nothing to do.
+ return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true};
+ }
+ UpstreamApplyCompleted CompletedTasks;
+ std::swap(CompletedTasks, m_CompletedTasks);
+ return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Completed = std::move(CompletedTasks), .Success = true};
+ }
+ }
+ catch (std::exception& Err)
+ {
+ m_HealthOk = false;
+ return {
+ .Error{.ErrorCode = -1, .Reason = Err.what()},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds,
+ };
+ }
+ }
+
+ virtual UpstreamApplyEndpointStats& Stats() override { return m_Stats; }
+
+ private:
+ spdlog::logger& Log() { return m_Log; }
+
+ spdlog::logger& m_Log;
+ CasStore& m_CasStore;
+ CidStore& m_CidStore;
+ AuthMgr& m_AuthMgr;
+ std::string m_DisplayName;
+ RefPtr<CloudCacheClient> m_Client;
+ RefPtr<CloudCacheClient> m_StorageClient;
+ UpstreamApplyEndpointStats m_Stats;
+ std::atomic_bool m_HealthOk{false};
+ std::string m_ChannelId;
+
+ std::mutex m_TaskMutex;
+ std::unordered_map<IoHash, UpstreamApplyRecord> m_PendingTasks;
+ UpstreamApplyCompleted m_CompletedTasks;
+
+ struct UpstreamData
+ {
+ std::map<IoHash, IoBuffer> Blobs;
+ std::map<IoHash, CbObject> Objects;
+ IoHash TaskId;
+ IoHash RequirementsId;
+ };
+
+ struct UpstreamDirectory
+ {
+ std::filesystem::path Path;
+ std::map<std::string, UpstreamDirectory> Directories;
+ std::set<std::string> Files;
+ };
+
+ static void ProcessQueueTimings(CbObjectView QueueStats, std::map<std::string, uint64_t>& Timepoints)
+ {
+ uint64_t Ticks = QueueStats["t"sv].AsDateTimeTicks();
+ if (Ticks == 0)
+ {
+ return;
+ }
+
+ // Scope is an array of miliseconds after start time
+ // TODO: cleanup
+ Timepoints["horde-queue-added"] = Ticks;
+ int Index = 0;
+ for (auto& Item : QueueStats["s"sv].AsArrayView())
+ {
+ Ticks += Item.AsInt32() * TimeSpan::TicksPerMillisecond;
+ switch (Index)
+ {
+ case 0:
+ Timepoints["horde-queue-dispatched"] = Ticks;
+ break;
+ case 1:
+ Timepoints["horde-queue-complete"] = Ticks;
+ break;
+ }
+ Index++;
+ }
+ }
+
+ static void ProcessExecuteTimings(CbObjectView ExecutionStats, std::map<std::string, uint64_t>& Timepoints)
+ {
+ uint64_t Ticks = ExecutionStats["t"sv].AsDateTimeTicks();
+ if (Ticks == 0)
+ {
+ return;
+ }
+
+ // Scope is an array of miliseconds after start time
+ // TODO: cleanup
+ Timepoints["horde-execution-start"] = Ticks;
+ int Index = 0;
+ for (auto& Item : ExecutionStats["s"sv].AsArrayView())
+ {
+ Ticks += Item.AsInt32() * TimeSpan::TicksPerMillisecond;
+ switch (Index)
+ {
+ case 0:
+ Timepoints["horde-execution-download-ref"] = Ticks;
+ break;
+ case 1:
+ Timepoints["horde-execution-download-input"] = Ticks;
+ break;
+ case 2:
+ Timepoints["horde-execution-execute"] = Ticks;
+ break;
+ case 3:
+ Timepoints["horde-execution-upload-log"] = Ticks;
+ break;
+ case 4:
+ Timepoints["horde-execution-upload-output"] = Ticks;
+ break;
+ case 5:
+ Timepoints["horde-execution-upload-ref"] = Ticks;
+ break;
+ }
+ Index++;
+ }
+ }
+
+ [[nodiscard]] GetUpstreamApplyResult ProcessTaskStatus(const UpstreamApplyType ApplyType, const IoHash& ResultHash)
+ {
+ try
+ {
+ CloudCacheSession Session(m_StorageClient);
+
+ GetUpstreamApplyResult ApplyResult{};
+
+ IoHash StdOutHash;
+ IoHash StdErrHash;
+ IoHash OutputHash;
+
+ std::map<IoHash, IoBuffer> BinaryData;
+
+ {
+ CloudCacheResult ObjectRefResult = Session.GetRef("responses"sv, ResultHash, ZenContentType::kCbObject);
+ Log().debug("Get ref {} Bytes={} Duration={}s Result={}",
+ ResultHash,
+ ObjectRefResult.Bytes,
+ ObjectRefResult.ElapsedSeconds,
+ ObjectRefResult.Success);
+ ApplyResult.Bytes += ObjectRefResult.Bytes;
+ ApplyResult.ElapsedSeconds += ObjectRefResult.ElapsedSeconds;
+ ApplyResult.Timepoints["zen-storage-get-ref"] = DateTime::NowTicks();
+
+ if (!ObjectRefResult.Success)
+ {
+ ApplyResult.Error.Reason = "Failed to get result object data";
+ return ApplyResult;
+ }
+
+ CbObject ResultObject = LoadCompactBinaryObject(ObjectRefResult.Response);
+ ApplyResult.Error.ErrorCode = ResultObject["e"sv].AsInt32();
+ StdOutHash = ResultObject["so"sv].AsBinaryAttachment();
+ StdErrHash = ResultObject["se"sv].AsBinaryAttachment();
+ OutputHash = ResultObject["o"sv].AsObjectAttachment();
+ }
+
+ {
+ std::set<IoHash> NeededData;
+ if (OutputHash != IoHash::Zero)
+ {
+ GetObjectReferencesResult ObjectReferenceResult = Session.GetObjectReferences(OutputHash);
+ Log().debug("Get object references {} References={} Bytes={} Duration={}s Result={}",
+ ResultHash,
+ ObjectReferenceResult.References.size(),
+ ObjectReferenceResult.Bytes,
+ ObjectReferenceResult.ElapsedSeconds,
+ ObjectReferenceResult.Success);
+ ApplyResult.Bytes += ObjectReferenceResult.Bytes;
+ ApplyResult.ElapsedSeconds += ObjectReferenceResult.ElapsedSeconds;
+ ApplyResult.Timepoints["zen-storage-get-object-references"] = DateTime::NowTicks();
+
+ if (!ObjectReferenceResult.Success)
+ {
+ ApplyResult.Error.Reason = "Failed to get result object references";
+ return ApplyResult;
+ }
+
+ NeededData = std::move(ObjectReferenceResult.References);
+ }
+
+ NeededData.insert(OutputHash);
+ NeededData.insert(StdOutHash);
+ NeededData.insert(StdErrHash);
+
+ for (const auto& Hash : NeededData)
+ {
+ if (Hash == IoHash::Zero)
+ {
+ continue;
+ }
+ CloudCacheResult BlobResult = Session.GetBlob(Hash);
+ Log().debug("Get blob {} Bytes={} Duration={}s Result={}",
+ Hash,
+ BlobResult.Bytes,
+ BlobResult.ElapsedSeconds,
+ BlobResult.Success);
+ ApplyResult.Bytes += BlobResult.Bytes;
+ ApplyResult.ElapsedSeconds += BlobResult.ElapsedSeconds;
+ if (!BlobResult.Success)
+ {
+ ApplyResult.Error.Reason = "Failed to get blob";
+ return ApplyResult;
+ }
+ BinaryData[Hash] = std::move(BlobResult.Response);
+ }
+ ApplyResult.Timepoints["zen-storage-get-blobs"] = DateTime::NowTicks();
+ }
+
+ ApplyResult.StdOut = StdOutHash != IoHash::Zero
+ ? std::string((const char*)BinaryData[StdOutHash].GetData(), BinaryData[StdOutHash].GetSize())
+ : "";
+ ApplyResult.StdErr = StdErrHash != IoHash::Zero
+ ? std::string((const char*)BinaryData[StdErrHash].GetData(), BinaryData[StdErrHash].GetSize())
+ : "";
+
+ if (OutputHash == IoHash::Zero)
+ {
+ ApplyResult.Error.Reason = "Task completed with no output object";
+ return ApplyResult;
+ }
+
+ CbObject OutputObject = LoadCompactBinaryObject(BinaryData[OutputHash]);
+
+ switch (ApplyType)
+ {
+ case UpstreamApplyType::Simple:
+ {
+ ResolveMerkleTreeDirectory(""sv, OutputHash, BinaryData, ApplyResult.OutputFiles);
+ for (const auto& Pair : BinaryData)
+ {
+ ApplyResult.FileData[Pair.first] = std::move(BinaryData.at(Pair.first));
+ }
+
+ ApplyResult.Success = ApplyResult.Error.ErrorCode == 0;
+ return ApplyResult;
+ }
+ break;
+ case UpstreamApplyType::Asset:
+ {
+ if (ApplyResult.Error.ErrorCode != 0)
+ {
+ ApplyResult.Error.Reason = "Task completed with errors";
+ return ApplyResult;
+ }
+
+ // Get build.output
+ IoHash BuildOutputId;
+ IoBuffer BuildOutput;
+ for (auto& It : OutputObject["f"sv])
+ {
+ const CbObjectView FileObject = It.AsObjectView();
+ if (FileObject["n"sv].AsString() == "Build.output"sv)
+ {
+ BuildOutputId = FileObject["h"sv].AsBinaryAttachment();
+ BuildOutput = BinaryData[BuildOutputId];
+ break;
+ }
+ }
+
+ if (BuildOutput.GetSize() == 0)
+ {
+ ApplyResult.Error.Reason = "Build.output file not found in task results";
+ return ApplyResult;
+ }
+
+ // Get Output directory node
+ IoBuffer OutputDirectoryTree;
+ for (auto& It : OutputObject["d"sv])
+ {
+ const CbObjectView DirectoryObject = It.AsObjectView();
+ if (DirectoryObject["n"sv].AsString() == "Outputs"sv)
+ {
+ OutputDirectoryTree = BinaryData[DirectoryObject["h"sv].AsObjectAttachment()];
+ break;
+ }
+ }
+
+ if (OutputDirectoryTree.GetSize() == 0)
+ {
+ ApplyResult.Error.Reason = "Outputs directory not found in task results";
+ return ApplyResult;
+ }
+
+ // load build.output as CbObject
+
+ // Move Outputs from Horde to CbPackage
+
+ std::unordered_map<IoHash, IoHash> CidToCompressedId;
+ CbPackage OutputPackage;
+ CbObject OutputDirectoryTreeObject = LoadCompactBinaryObject(OutputDirectoryTree);
+
+ for (auto& It : OutputDirectoryTreeObject["f"sv])
+ {
+ CbObjectView FileObject = It.AsObjectView();
+ // Name is the uncompressed hash
+ IoHash DecompressedId = IoHash::FromHexString(FileObject["n"sv].AsString());
+ // Hash is the compressed data hash, and how it is stored in Horde
+ IoHash CompressedId = FileObject["h"sv].AsBinaryAttachment();
+
+ if (!BinaryData.contains(CompressedId))
+ {
+ Log().warn("Object attachment chunk not retrieved from Horde {}", CompressedId);
+ ApplyResult.Error.Reason = "Object attachment chunk not retrieved from Horde";
+ return ApplyResult;
+ }
+ CidToCompressedId[DecompressedId] = CompressedId;
+ }
+
+ // Iterate attachments, verify all chunks exist, and add to CbPackage
+ bool AnyErrors = false;
+ CbObject BuildOutputObject = LoadCompactBinaryObject(BuildOutput);
+ BuildOutputObject.IterateAttachments([&](CbFieldView Field) {
+ const IoHash DecompressedId = Field.AsHash();
+ if (!CidToCompressedId.contains(DecompressedId))
+ {
+ Log().warn("Attachment not found {}", DecompressedId);
+ AnyErrors = true;
+ return;
+ }
+ const IoHash& CompressedId = CidToCompressedId.at(DecompressedId);
+
+ if (!BinaryData.contains(CompressedId))
+ {
+ Log().warn("Missing output {} compressed {} uncompressed", CompressedId, DecompressedId);
+ AnyErrors = true;
+ return;
+ }
+
+ CompressedBuffer AttachmentBuffer =
+ CompressedBuffer::FromCompressed(SharedBuffer(BinaryData[CompressedId]));
+
+ if (!AttachmentBuffer)
+ {
+ Log().warn(
+ "Invalid output encountered (not valid CompressedBuffer format) {} compressed {} uncompressed",
+ CompressedId,
+ DecompressedId);
+ AnyErrors = true;
+ return;
+ }
+
+ ApplyResult.TotalAttachmentBytes += AttachmentBuffer.GetCompressedSize();
+ ApplyResult.TotalRawAttachmentBytes += AttachmentBuffer.GetRawSize();
+
+ CbAttachment Attachment(AttachmentBuffer);
+ OutputPackage.AddAttachment(Attachment);
+ });
+
+ if (AnyErrors)
+ {
+ ApplyResult.Error.Reason = "Failed to get result object attachment data";
+ return ApplyResult;
+ }
+
+ OutputPackage.SetObject(BuildOutputObject);
+ ApplyResult.OutputPackage = std::move(OutputPackage);
+
+ ApplyResult.Success = ApplyResult.Error.ErrorCode == 0;
+ return ApplyResult;
+ }
+ break;
+ }
+
+ ApplyResult.Error.Reason = "Unknown apply type";
+ return ApplyResult;
+ }
+ catch (std::exception& Err)
+ {
+ return {.Error{.ErrorCode = -1, .Reason = Err.what()}};
+ }
+ }
+
+ [[nodiscard]] bool ProcessApplyKey(const UpstreamApplyRecord& ApplyRecord, UpstreamData& Data)
+ {
+ std::string ExecutablePath;
+ std::string WorkingDirectory;
+ std::vector<std::string> Arguments;
+ std::map<std::string, std::string> Environment;
+ std::set<std::filesystem::path> InputFiles;
+ std::set<std::string> Outputs;
+ std::map<std::filesystem::path, IoHash> InputFileHashes;
+
+ ExecutablePath = ApplyRecord.WorkerDescriptor["path"sv].AsString();
+ if (ExecutablePath.empty())
+ {
+ Log().warn("process apply upstream FAILED, '{}', path missing from worker descriptor",
+ ApplyRecord.WorkerDescriptor.GetHash());
+ return false;
+ }
+
+ WorkingDirectory = ApplyRecord.WorkerDescriptor["workdir"sv].AsString();
+
+ for (auto& It : ApplyRecord.WorkerDescriptor["executables"sv])
+ {
+ CbObjectView FileEntry = It.AsObjectView();
+ if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.Blobs))
+ {
+ return false;
+ }
+ }
+
+ for (auto& It : ApplyRecord.WorkerDescriptor["files"sv])
+ {
+ CbObjectView FileEntry = It.AsObjectView();
+ if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.Blobs))
+ {
+ return false;
+ }
+ }
+
+ for (auto& It : ApplyRecord.WorkerDescriptor["dirs"sv])
+ {
+ std::string_view Directory = It.AsString();
+ std::string DummyFile = fmt::format("{}/.zen_empty_file", Directory);
+ InputFiles.insert(DummyFile);
+ Data.Blobs[EmptyBufferId] = EmptyBuffer;
+ InputFileHashes[DummyFile] = EmptyBufferId;
+ }
+
+ if (!WorkingDirectory.empty())
+ {
+ std::string DummyFile = fmt::format("{}/.zen_empty_file", WorkingDirectory);
+ InputFiles.insert(DummyFile);
+ Data.Blobs[EmptyBufferId] = EmptyBuffer;
+ InputFileHashes[DummyFile] = EmptyBufferId;
+ }
+
+ for (auto& It : ApplyRecord.WorkerDescriptor["environment"sv])
+ {
+ std::string_view Env = It.AsString();
+ auto Index = Env.find('=');
+ if (Index == std::string_view::npos)
+ {
+ Log().warn("process apply upstream FAILED, environment '{}' malformed", Env);
+ return false;
+ }
+
+ Environment[std::string(Env.substr(0, Index))] = Env.substr(Index + 1);
+ }
+
+ switch (ApplyRecord.Type)
+ {
+ case UpstreamApplyType::Simple:
+ {
+ for (auto& It : ApplyRecord.WorkerDescriptor["arguments"sv])
+ {
+ Arguments.push_back(std::string(It.AsString()));
+ }
+
+ for (auto& It : ApplyRecord.WorkerDescriptor["outputs"sv])
+ {
+ Outputs.insert(std::string(It.AsString()));
+ }
+ }
+ break;
+ case UpstreamApplyType::Asset:
+ {
+ static const std::filesystem::path BuildActionPath = "Build.action"sv;
+ static const std::filesystem::path InputPath = "Inputs"sv;
+ const IoHash ActionId = ApplyRecord.Action.GetHash();
+
+ Arguments.push_back("-Build=build.action");
+ Outputs.insert("Build.output");
+ Outputs.insert("Outputs");
+
+ InputFiles.insert(BuildActionPath);
+ InputFileHashes[BuildActionPath] = ActionId;
+ Data.Blobs[ActionId] = IoBufferBuilder::MakeCloneFromMemory(ApplyRecord.Action.GetBuffer().GetData(),
+ ApplyRecord.Action.GetBuffer().GetSize());
+
+ bool AnyErrors = false;
+ ApplyRecord.Action.IterateAttachments([&](CbFieldView Field) {
+ const IoHash Cid = Field.AsHash();
+ const std::filesystem::path FilePath = {InputPath / Cid.ToHexString()};
+ IoBuffer DataBuffer = m_CidStore.FindChunkByCid(Cid);
+
+ if (!DataBuffer)
+ {
+ Log().warn("process apply upstream FAILED, input CID chunk '{}' missing", Cid);
+ AnyErrors = true;
+ return;
+ }
+
+ if (InputFiles.contains(FilePath))
+ {
+ return;
+ }
+
+ const IoHash CompressedId = IoHash::HashBuffer(DataBuffer.GetData(), DataBuffer.GetSize());
+
+ InputFiles.insert(FilePath);
+ InputFileHashes[FilePath] = CompressedId;
+ Data.Blobs[CompressedId] = std::move(DataBuffer);
+ });
+
+ if (AnyErrors)
+ {
+ return false;
+ }
+ }
+ break;
+ }
+
+ const UpstreamDirectory RootDirectory = BuildDirectoryTree(InputFiles);
+
+ CbObject Sandbox = BuildMerkleTreeDirectory(RootDirectory, InputFileHashes, Data.Blobs, Data.Objects);
+ const IoHash SandboxHash = Sandbox.GetHash();
+ Data.Objects[SandboxHash] = std::move(Sandbox);
+
+ {
+ std::string_view HostPlatform = ApplyRecord.WorkerDescriptor["host"sv].AsString();
+ if (HostPlatform.empty())
+ {
+ Log().warn("process apply upstream FAILED, 'host' platform not provided");
+ return false;
+ }
+
+ int32_t LogicalCores = ApplyRecord.WorkerDescriptor["cores"sv].AsInt32();
+ int64_t Memory = ApplyRecord.WorkerDescriptor["memory"sv].AsInt64();
+ bool Exclusive = ApplyRecord.WorkerDescriptor["exclusive"sv].AsBool();
+
+ std::string Condition = fmt::format("Platform == '{}'", HostPlatform);
+ if (HostPlatform == "Win64")
+ {
+ // TODO
+ // Condition += " && Pool == 'Win-RemoteExec'";
+ }
+
+ std::map<std::string_view, int64_t> Resources;
+ if (LogicalCores > 0)
+ {
+ Resources["LogicalCores"sv] = LogicalCores;
+ }
+ if (Memory > 0)
+ {
+ Resources["RAM"sv] = std::max(Memory / 1024LL / 1024LL / 1024LL, 1LL);
+ }
+
+ CbObject Requirements = BuildRequirements(Condition, Resources, Exclusive);
+ const IoHash RequirementsId = Requirements.GetHash();
+ Data.Objects[RequirementsId] = std::move(Requirements);
+ Data.RequirementsId = RequirementsId;
+ }
+
+ CbObject Task = BuildTask(ExecutablePath, Arguments, Environment, WorkingDirectory, SandboxHash, Data.RequirementsId, Outputs);
+
+ const IoHash TaskId = Task.GetHash();
+ Data.Objects[TaskId] = std::move(Task);
+ Data.TaskId = TaskId;
+
+ return true;
+ }
+
+ [[nodiscard]] bool ProcessFileEntry(const CbObjectView& FileEntry,
+ std::set<std::filesystem::path>& InputFiles,
+ std::map<std::filesystem::path, IoHash>& InputFileHashes,
+ std::map<IoHash, IoBuffer>& Blobs)
+ {
+ const std::filesystem::path FilePath = FileEntry["name"sv].AsString();
+ const IoHash ChunkId = FileEntry["hash"sv].AsHash();
+ const uint64_t Size = FileEntry["size"sv].AsUInt64();
+ IoBuffer DataBuffer = m_CasStore.FindChunk(ChunkId);
+
+ if (!DataBuffer)
+ {
+ Log().warn("process apply upstream FAILED, worker CAS chunk '{}' missing", ChunkId);
+ return false;
+ }
+
+ if (DataBuffer.Size() != Size)
+ {
+ Log().warn("process apply upstream FAILED, worker CAS chunk '{}' size: {}, action spec expected {}",
+ ChunkId,
+ DataBuffer.Size(),
+ Size);
+ return false;
+ }
+
+ if (InputFiles.contains(FilePath))
+ {
+ Log().warn("process apply upstream FAILED, worker CAS chunk '{}' size: {} duplicate filename {}", ChunkId, Size, FilePath);
+ return false;
+ }
+
+ InputFiles.insert(FilePath);
+ InputFileHashes[FilePath] = ChunkId;
+ Blobs[ChunkId] = std::move(DataBuffer);
+ return true;
+ }
+
+ [[nodiscard]] UpstreamDirectory BuildDirectoryTree(const std::set<std::filesystem::path>& InputFiles)
+ {
+ static const std::filesystem::path RootPath;
+ std::map<std::filesystem::path, UpstreamDirectory*> AllDirectories;
+ UpstreamDirectory RootDirectory = {.Path = RootPath};
+
+ AllDirectories[RootPath] = &RootDirectory;
+
+ // Build tree from flat list
+ for (const auto& Path : InputFiles)
+ {
+ if (Path.has_parent_path())
+ {
+ if (!AllDirectories.contains(Path.parent_path()))
+ {
+ std::stack<std::string> PathSplit;
+ {
+ std::filesystem::path ParentPath = Path.parent_path();
+ PathSplit.push(ParentPath.filename().string());
+ while (ParentPath.has_parent_path())
+ {
+ ParentPath = ParentPath.parent_path();
+ PathSplit.push(ParentPath.filename().string());
+ }
+ }
+ UpstreamDirectory* ParentPtr = &RootDirectory;
+ while (!PathSplit.empty())
+ {
+ if (!ParentPtr->Directories.contains(PathSplit.top()))
+ {
+ std::filesystem::path NewParentPath = {ParentPtr->Path / PathSplit.top()};
+ ParentPtr->Directories[PathSplit.top()] = {.Path = NewParentPath};
+ AllDirectories[NewParentPath] = &ParentPtr->Directories[PathSplit.top()];
+ }
+ ParentPtr = &ParentPtr->Directories[PathSplit.top()];
+ PathSplit.pop();
+ }
+ }
+
+ AllDirectories[Path.parent_path()]->Files.insert(Path.filename().string());
+ }
+ else
+ {
+ RootDirectory.Files.insert(Path.filename().string());
+ }
+ }
+
+ return RootDirectory;
+ }
+
+ [[nodiscard]] CbObject BuildMerkleTreeDirectory(const UpstreamDirectory& RootDirectory,
+ const std::map<std::filesystem::path, IoHash>& InputFileHashes,
+ const std::map<IoHash, IoBuffer>& Blobs,
+ std::map<IoHash, CbObject>& Objects)
+ {
+ CbObjectWriter DirectoryTreeWriter;
+
+ if (!RootDirectory.Files.empty())
+ {
+ DirectoryTreeWriter.BeginArray("f"sv);
+ for (const auto& File : RootDirectory.Files)
+ {
+ const std::filesystem::path FilePath = {RootDirectory.Path / File};
+ const IoHash& FileHash = InputFileHashes.at(FilePath);
+ const uint64_t FileSize = Blobs.at(FileHash).Size();
+ DirectoryTreeWriter.BeginObject();
+ DirectoryTreeWriter.AddString("n"sv, File);
+ DirectoryTreeWriter.AddBinaryAttachment("h"sv, FileHash);
+ DirectoryTreeWriter.AddInteger("s"sv, FileSize); // Size
+ // DirectoryTreeWriter.AddInteger("a"sv, 0); // Attributes Currently unneeded
+ DirectoryTreeWriter.EndObject();
+ }
+ DirectoryTreeWriter.EndArray();
+ }
+
+ if (!RootDirectory.Directories.empty())
+ {
+ DirectoryTreeWriter.BeginArray("d"sv);
+ for (const auto& Item : RootDirectory.Directories)
+ {
+ CbObject Directory = BuildMerkleTreeDirectory(Item.second, InputFileHashes, Blobs, Objects);
+ const IoHash DirectoryHash = Directory.GetHash();
+ Objects[DirectoryHash] = std::move(Directory);
+
+ DirectoryTreeWriter.BeginObject();
+ DirectoryTreeWriter.AddString("n"sv, Item.first);
+ DirectoryTreeWriter.AddObjectAttachment("h"sv, DirectoryHash);
+ DirectoryTreeWriter.EndObject();
+ }
+ DirectoryTreeWriter.EndArray();
+ }
+
+ return DirectoryTreeWriter.Save();
+ }
+
+ void ResolveMerkleTreeDirectory(const std::filesystem::path& ParentDirectory,
+ const IoHash& DirectoryHash,
+ const std::map<IoHash, IoBuffer>& Objects,
+ std::map<std::filesystem::path, IoHash>& OutputFiles)
+ {
+ CbObject Directory = LoadCompactBinaryObject(Objects.at(DirectoryHash));
+
+ for (auto& It : Directory["f"sv])
+ {
+ const CbObjectView FileObject = It.AsObjectView();
+ const std::filesystem::path Path = ParentDirectory / FileObject["n"sv].AsString();
+
+ OutputFiles[Path] = FileObject["h"sv].AsBinaryAttachment();
+ }
+
+ for (auto& It : Directory["d"sv])
+ {
+ const CbObjectView DirectoryObject = It.AsObjectView();
+
+ ResolveMerkleTreeDirectory(ParentDirectory / DirectoryObject["n"sv].AsString(),
+ DirectoryObject["h"sv].AsObjectAttachment(),
+ Objects,
+ OutputFiles);
+ }
+ }
+
+ [[nodiscard]] CbObject BuildRequirements(const std::string_view Condition,
+ const std::map<std::string_view, int64_t>& Resources,
+ const bool Exclusive)
+ {
+ CbObjectWriter Writer;
+ Writer.AddString("c", Condition);
+ if (!Resources.empty())
+ {
+ Writer.BeginArray("r");
+ for (const auto& Resource : Resources)
+ {
+ Writer.BeginArray();
+ Writer.AddString(Resource.first);
+ Writer.AddInteger(Resource.second);
+ Writer.EndArray();
+ }
+ Writer.EndArray();
+ }
+ Writer.AddBool("e", Exclusive);
+ return Writer.Save();
+ }
+
+ [[nodiscard]] CbObject BuildTask(const std::string_view Executable,
+ const std::vector<std::string>& Arguments,
+ const std::map<std::string, std::string>& Environment,
+ const std::string_view WorkingDirectory,
+ const IoHash& SandboxHash,
+ const IoHash& RequirementsId,
+ const std::set<std::string>& Outputs)
+ {
+ CbObjectWriter TaskWriter;
+ TaskWriter.AddString("e"sv, Executable);
+
+ if (!Arguments.empty())
+ {
+ TaskWriter.BeginArray("a"sv);
+ for (const auto& Argument : Arguments)
+ {
+ TaskWriter.AddString(Argument);
+ }
+ TaskWriter.EndArray();
+ }
+
+ if (!Environment.empty())
+ {
+ TaskWriter.BeginArray("v"sv);
+ for (const auto& Env : Environment)
+ {
+ TaskWriter.BeginArray();
+ TaskWriter.AddString(Env.first);
+ TaskWriter.AddString(Env.second);
+ TaskWriter.EndArray();
+ }
+ TaskWriter.EndArray();
+ }
+
+ if (!WorkingDirectory.empty())
+ {
+ TaskWriter.AddString("w"sv, WorkingDirectory);
+ }
+
+ TaskWriter.AddObjectAttachment("s"sv, SandboxHash);
+ TaskWriter.AddObjectAttachment("r"sv, RequirementsId);
+
+ // Outputs
+ if (!Outputs.empty())
+ {
+ TaskWriter.BeginArray("o"sv);
+ for (const auto& Output : Outputs)
+ {
+ TaskWriter.AddString(Output);
+ }
+ TaskWriter.EndArray();
+ }
+
+ return TaskWriter.Save();
+ }
+ };
+} // namespace detail
+
+//////////////////////////////////////////////////////////////////////////
+
+std::unique_ptr<UpstreamApplyEndpoint>
+UpstreamApplyEndpoint::CreateHordeEndpoint(const CloudCacheClientOptions& ComputeOptions,
+ const UpstreamAuthConfig& ComputeAuthConfig,
+ const CloudCacheClientOptions& StorageOptions,
+ const UpstreamAuthConfig& StorageAuthConfig,
+ CasStore& CasStore,
+ CidStore& CidStore,
+ AuthMgr& Mgr)
+{
+ return std::make_unique<detail::HordeUpstreamApplyEndpoint>(ComputeOptions,
+ ComputeAuthConfig,
+ StorageOptions,
+ StorageAuthConfig,
+ CasStore,
+ CidStore,
+ Mgr);
+}
+
+} // namespace zen
+
+#endif // ZEN_WITH_COMPUTE_SERVICES \ No newline at end of file
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp
index 7eef96556..4bec41a29 100644
--- a/zenserver/upstream/jupiter.cpp
+++ b/zenserver/upstream/jupiter.cpp
@@ -540,6 +540,50 @@ CloudCacheSession::RefExists(std::string_view BucketId, const IoHash& Key)
return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
}
+GetObjectReferencesResult
+CloudCacheSession::GetObjectReferences(const IoHash& Key)
+{
+ ZEN_TRACE_CPU("HordeClient::GetObjectReferences");
+
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString()
+ << "/references";
+
+ cpr::Session& Session = GetSession();
+ const CloudCacheAccessToken& AccessToken = GetAccessToken();
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}});
+ Session.SetOption(cpr::Body{});
+
+ cpr::Response Response = Session.Get();
+ ZEN_DEBUG("GET {}", Response);
+
+ if (Response.error)
+ {
+ return {CloudCacheResult{.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}};
+ }
+ else if (!VerifyAccessToken(Response.status_code))
+ {
+ return {CloudCacheResult{.ErrorCode = 401, .Reason = std::string("Invalid access token")}};
+ }
+
+ GetObjectReferencesResult Result{
+ CloudCacheResult{.Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}};
+
+ if (Result.Success)
+ {
+ IoBuffer Buffer = IoBuffer(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size());
+ const CbObject ReferencesResponse = LoadCompactBinaryObject(Buffer);
+ for (auto& Item : ReferencesResponse["references"sv])
+ {
+ Result.References.insert(Item.AsHash());
+ }
+ }
+
+ return Result;
+}
+
CloudCacheResult
CloudCacheSession::BlobExists(const IoHash& Key)
{
@@ -603,7 +647,7 @@ CloudCacheSession::PostComputeTasks(IoBuffer TasksData)
return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
}
- return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
+ return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
}
CloudCacheResult
diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h
index bc0d84506..cff9a9ef1 100644
--- a/zenserver/upstream/jupiter.h
+++ b/zenserver/upstream/jupiter.h
@@ -76,6 +76,11 @@ struct CloudCacheExistsResult : CloudCacheResult
std::set<IoHash> Needs;
};
+struct GetObjectReferencesResult : CloudCacheResult
+{
+ std::set<IoHash> References;
+};
+
/**
* Context for performing Jupiter operations
*
@@ -108,6 +113,8 @@ public:
CloudCacheResult RefExists(std::string_view BucketId, const IoHash& Key);
+ GetObjectReferencesResult GetObjectReferences(const IoHash& Key);
+
CloudCacheResult BlobExists(const IoHash& Key);
CloudCacheResult CompressedBlobExists(const IoHash& Key);
CloudCacheResult ObjectExists(const IoHash& Key);
diff --git a/zenserver/upstream/upstreamapply.cpp b/zenserver/upstream/upstreamapply.cpp
index 17a6bb3cf..9758e7565 100644
--- a/zenserver/upstream/upstreamapply.cpp
+++ b/zenserver/upstream/upstreamapply.cpp
@@ -4,1303 +4,26 @@
#if ZEN_WITH_COMPUTE_SERVICES
-# include "jupiter.h"
-# include "zen.h"
-
# include <zencore/compactbinary.h>
# include <zencore/compactbinarybuilder.h>
-# include <zencore/compactbinarypackage.h>
-# include <zencore/compactbinaryvalidation.h>
-# include <zencore/compress.h>
# include <zencore/fmtutils.h>
-# include <zencore/session.h>
-# include <zencore/stats.h>
# include <zencore/stream.h>
-# include <zencore/thread.h>
# include <zencore/timer.h>
# include <zencore/workthreadpool.h>
# include <zenstore/cas.h>
# include <zenstore/cidstore.h>
-# include <auth/authmgr.h>
-# include <upstream/upstreamcache.h>
-
-# include "cache/structuredcachestore.h"
# include "diag/logging.h"
# include <fmt/format.h>
-# include <algorithm>
# include <atomic>
-# include <set>
-# include <stack>
-# include <thread>
-# include <unordered_map>
namespace zen {
using namespace std::literals;
-static const IoBuffer EmptyBuffer;
-static const IoHash EmptyBufferId = IoHash::HashBuffer(EmptyBuffer);
-
-namespace detail {
-
- class HordeUpstreamApplyEndpoint final : public UpstreamApplyEndpoint
- {
- public:
- HordeUpstreamApplyEndpoint(const CloudCacheClientOptions& ComputeOptions,
- const UpstreamAuthConfig& ComputeAuthConfig,
- const CloudCacheClientOptions& StorageOptions,
- const UpstreamAuthConfig& StorageAuthConfig,
- CasStore& CasStore,
- CidStore& CidStore,
- AuthMgr& Mgr)
- : m_Log(logging::Get("upstream-apply"))
- , m_CasStore(CasStore)
- , m_CidStore(CidStore)
- , m_AuthMgr(Mgr)
- {
- m_DisplayName = fmt::format("{} - '{}'+'{}'", ComputeOptions.Name, ComputeOptions.ServiceUrl, StorageOptions.ServiceUrl);
- m_ChannelId = fmt::format("zen-{}", zen::GetSessionIdString());
-
- {
- std::unique_ptr<CloudCacheTokenProvider> TokenProvider;
-
- if (ComputeAuthConfig.OAuthUrl.empty() == false)
- {
- TokenProvider =
- CloudCacheTokenProvider::CreateFromOAuthClientCredentials({.Url = ComputeAuthConfig.OAuthUrl,
- .ClientId = ComputeAuthConfig.OAuthClientId,
- .ClientSecret = ComputeAuthConfig.OAuthClientSecret});
- }
- else if (ComputeAuthConfig.OpenIdProvider.empty() == false)
- {
- TokenProvider =
- CloudCacheTokenProvider::CreateFromCallback([this, ProviderName = std::string(ComputeAuthConfig.OpenIdProvider)]() {
- AuthMgr::OpenIdAccessToken Token = m_AuthMgr.GetOpenIdAccessToken(ProviderName);
- return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime};
- });
- }
- else
- {
- CloudCacheAccessToken AccessToken{.Value = std::string(ComputeAuthConfig.AccessToken),
- .ExpireTime = CloudCacheAccessToken::TimePoint::max()};
- TokenProvider = CloudCacheTokenProvider::CreateFromStaticToken(AccessToken);
- }
-
- m_Client = new CloudCacheClient(ComputeOptions, std::move(TokenProvider));
- }
-
- {
- std::unique_ptr<CloudCacheTokenProvider> TokenProvider;
-
- if (StorageAuthConfig.OAuthUrl.empty() == false)
- {
- TokenProvider =
- CloudCacheTokenProvider::CreateFromOAuthClientCredentials({.Url = StorageAuthConfig.OAuthUrl,
- .ClientId = StorageAuthConfig.OAuthClientId,
- .ClientSecret = StorageAuthConfig.OAuthClientSecret});
- }
- else if (StorageAuthConfig.OpenIdProvider.empty() == false)
- {
- TokenProvider =
- CloudCacheTokenProvider::CreateFromCallback([this, ProviderName = std::string(StorageAuthConfig.OpenIdProvider)]() {
- AuthMgr::OpenIdAccessToken Token = m_AuthMgr.GetOpenIdAccessToken(ProviderName);
- return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime};
- });
- }
- else
- {
- CloudCacheAccessToken AccessToken{.Value = std::string(StorageAuthConfig.AccessToken),
- .ExpireTime = CloudCacheAccessToken::TimePoint::max()};
- TokenProvider = CloudCacheTokenProvider::CreateFromStaticToken(AccessToken);
- }
-
- m_StorageClient = new CloudCacheClient(StorageOptions, std::move(TokenProvider));
- }
- }
-
- virtual ~HordeUpstreamApplyEndpoint() = default;
-
- virtual UpstreamEndpointHealth Initialize() override { return CheckHealth(); }
-
- virtual bool IsHealthy() const override { return m_HealthOk.load(); }
-
- virtual UpstreamEndpointHealth CheckHealth() override
- {
- try
- {
- CloudCacheSession Session(m_Client);
- CloudCacheResult Result = Session.Authenticate();
-
- m_HealthOk = Result.ErrorCode == 0;
-
- return {.Reason = std::move(Result.Reason), .Ok = Result.Success};
- }
- catch (std::exception& Err)
- {
- return {.Reason = Err.what(), .Ok = false};
- }
- }
-
- virtual std::string_view DisplayName() const override { return m_DisplayName; }
-
- virtual PostUpstreamApplyResult PostApply(UpstreamApplyRecord ApplyRecord) override
- {
- int64_t Bytes{};
- double ElapsedSeconds{};
-
- try
- {
- UpstreamData UpstreamData;
- if (!ProcessApplyKey(ApplyRecord, UpstreamData))
- {
- return {.Error{.ErrorCode = -1, .Reason = "Failed to generate task data"}};
- }
-
- {
- std::scoped_lock Lock(m_TaskMutex);
- if (m_PendingTasks.contains(UpstreamData.TaskId))
- {
- // Pending task is already queued, return success
- return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true};
- }
- m_PendingTasks[UpstreamData.TaskId] = std::move(ApplyRecord);
- }
-
- CloudCacheSession ComputeSession(m_Client);
- CloudCacheSession StorageSession(m_StorageClient);
-
- {
- CloudCacheResult Result = BatchPutBlobsIfMissing(StorageSession, UpstreamData.Blobs);
- Bytes += Result.Bytes;
- ElapsedSeconds += Result.ElapsedSeconds;
- if (!Result.Success)
- {
- return {.Error{.ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1,
- .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to upload blobs"},
- .Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds};
- }
- UpstreamData.Blobs.clear();
- }
-
- {
- CloudCacheResult Result = BatchPutObjectsIfMissing(StorageSession, UpstreamData.Objects);
- Bytes += Result.Bytes;
- ElapsedSeconds += Result.ElapsedSeconds;
- if (!Result.Success)
- {
- return {.Error{.ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1,
- .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to upload objects"},
- .Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds};
- }
-
- PutRefResult RefResult = StorageSession.PutRef("requests"sv,
- UpstreamData.TaskId,
- UpstreamData.Objects[UpstreamData.TaskId].GetBuffer().AsIoBuffer(),
- ZenContentType::kCbObject);
- Log().debug("Put ref {} Need={} Bytes={} Duration={}s Result={}",
- UpstreamData.TaskId,
- RefResult.Needs.size(),
- RefResult.Bytes,
- RefResult.ElapsedSeconds,
- RefResult.Success);
-
- Bytes += RefResult.Bytes;
- ElapsedSeconds += RefResult.ElapsedSeconds;
- if (!RefResult.Success)
- {
- return {.Error{.ErrorCode = RefResult.ErrorCode ? RefResult.ErrorCode : -1,
- .Reason = !RefResult.Reason.empty() ? std::move(RefResult.Reason) : "Failed to add task ref"},
- .Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds};
- }
- UpstreamData.Objects.clear();
- }
-
- CbObjectWriter Writer;
- Writer.AddString("c"sv, m_ChannelId);
- Writer.AddObjectAttachment("r"sv, UpstreamData.RequirementsId);
- Writer.BeginArray("t"sv);
- Writer.AddObjectAttachment(UpstreamData.TaskId);
- Writer.EndArray();
- CbObject TasksObject = Writer.Save();
- IoBuffer TasksData = TasksObject.GetBuffer().AsIoBuffer();
-
- CloudCacheResult Result = ComputeSession.PostComputeTasks(TasksData);
- Log().debug("Post compute task {} Bytes={} Duration={}s Result={}",
- TasksObject.GetHash(),
- Result.Bytes,
- Result.ElapsedSeconds,
- Result.Success);
- Bytes += Result.Bytes;
- ElapsedSeconds += Result.ElapsedSeconds;
- if (!Result.Success)
- {
- {
- std::scoped_lock Lock(m_TaskMutex);
- m_PendingTasks.erase(UpstreamData.TaskId);
- }
-
- return {.Error{.ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1,
- .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to post compute task"},
- .Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds};
- }
-
- Log().info("Task posted {}", UpstreamData.TaskId);
- return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true};
- }
- catch (std::exception& Err)
- {
- m_HealthOk = false;
- return {.Error{.ErrorCode = -1, .Reason = Err.what()}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds};
- }
- }
-
- [[nodiscard]] CloudCacheResult BatchPutBlobsIfMissing(CloudCacheSession& Session, const std::map<IoHash, IoBuffer>& Blobs)
- {
- if (Blobs.size() == 0)
- {
- return {.Success = true};
- }
-
- int64_t Bytes{};
- double ElapsedSeconds{};
-
- // Batch check for missing blobs
- std::set<IoHash> Keys;
- std::transform(Blobs.begin(), Blobs.end(), std::inserter(Keys, Keys.end()), [](const auto& It) { return It.first; });
-
- CloudCacheExistsResult ExistsResult = Session.BlobExists(Keys);
- Log().debug("Queried {} missing blobs Need={} Duration={}s Result={}",
- Keys.size(),
- ExistsResult.Needs.size(),
- ExistsResult.ElapsedSeconds,
- ExistsResult.Success);
- ElapsedSeconds += ExistsResult.ElapsedSeconds;
- if (!ExistsResult.Success)
- {
- return {.Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds,
- .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1,
- .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to check if blobs exist"};
- }
-
- // TODO: Batch upload missing blobs
-
- for (const auto& Hash : ExistsResult.Needs)
- {
- CloudCacheResult Result = Session.PutBlob(Hash, Blobs.at(Hash));
- Log().debug("Put blob {} Bytes={} Duration={}s Result={}", Hash, Result.Bytes, Result.ElapsedSeconds, Result.Success);
- Bytes += Result.Bytes;
- ElapsedSeconds += Result.ElapsedSeconds;
- if (!Result.Success)
- {
- return {.Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds,
- .ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1,
- .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to put blobs"};
- }
- }
-
- return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true};
- }
-
- [[nodiscard]] CloudCacheResult BatchPutObjectsIfMissing(CloudCacheSession& Session, const std::map<IoHash, CbObject>& Objects)
- {
- if (Objects.size() == 0)
- {
- return {.Success = true};
- }
-
- int64_t Bytes{};
- double ElapsedSeconds{};
-
- // Batch check for missing objects
- std::set<IoHash> Keys;
- std::transform(Objects.begin(), Objects.end(), std::inserter(Keys, Keys.end()), [](const auto& It) { return It.first; });
-
- // Todo: Endpoint doesn't exist for objects
- CloudCacheExistsResult ExistsResult = Session.ObjectExists(Keys);
- Log().debug("Queried {} missing objects Need={} Duration={}s Result={}",
- Keys.size(),
- ExistsResult.Needs.size(),
- ExistsResult.ElapsedSeconds,
- ExistsResult.Success);
- ElapsedSeconds += ExistsResult.ElapsedSeconds;
- if (!ExistsResult.Success)
- {
- return {.Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds,
- .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1,
- .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to check if objects exist"};
- }
-
- // TODO: Batch upload missing objects
-
- for (const auto& Hash : ExistsResult.Needs)
- {
- CloudCacheResult Result = Session.PutObject(Hash, Objects.at(Hash).GetBuffer().AsIoBuffer());
- Log().debug("Put object {} Bytes={} Duration={}s Result={}", Hash, Result.Bytes, Result.ElapsedSeconds, Result.Success);
- Bytes += Result.Bytes;
- ElapsedSeconds += Result.ElapsedSeconds;
- if (!Result.Success)
- {
- return {.Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds,
- .ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1,
- .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to put objects"};
- }
- }
-
- return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true};
- }
-
- enum class ComputeTaskState : int32_t
- {
- Queued = 0,
- Executing = 1,
- Complete = 2,
- };
-
- enum class ComputeTaskOutcome : int32_t
- {
- Success = 0,
- Failed = 1,
- Cancelled = 2,
- NoResult = 3,
- Exipred = 4,
- BlobNotFound = 5,
- Exception = 6,
- };
-
- [[nodiscard]] static std::string_view ComputeTaskStateToString(const ComputeTaskState Outcome)
- {
- switch (Outcome)
- {
- case ComputeTaskState::Queued:
- return "Queued"sv;
- case ComputeTaskState::Executing:
- return "Executing"sv;
- case ComputeTaskState::Complete:
- return "Complete"sv;
- };
- return "Unknown"sv;
- }
-
- [[nodiscard]] static std::string_view ComputeTaskOutcomeToString(const ComputeTaskOutcome Outcome)
- {
- switch (Outcome)
- {
- case ComputeTaskOutcome::Success:
- return "Success"sv;
- case ComputeTaskOutcome::Failed:
- return "Failed"sv;
- case ComputeTaskOutcome::Cancelled:
- return "Cancelled"sv;
- case ComputeTaskOutcome::NoResult:
- return "NoResult"sv;
- case ComputeTaskOutcome::Exipred:
- return "Exipred"sv;
- case ComputeTaskOutcome::BlobNotFound:
- return "BlobNotFound"sv;
- case ComputeTaskOutcome::Exception:
- return "Exception"sv;
- };
- return "Unknown"sv;
- }
-
- virtual GetUpstreamApplyUpdatesResult GetUpdates(WorkerThreadPool& ThreadPool) override
- {
- int64_t Bytes{};
- double ElapsedSeconds{};
-
- {
- std::scoped_lock Lock(m_TaskMutex);
- if (m_PendingTasks.empty())
- {
- if (m_CompletedTasks.empty())
- {
- // Nothing to do.
- return {.Success = true};
- }
-
- UpstreamApplyCompleted CompletedTasks;
- std::swap(CompletedTasks, m_CompletedTasks);
- return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Completed = std::move(CompletedTasks), .Success = true};
- }
- }
-
- try
- {
- CloudCacheSession ComputeSession(m_Client);
-
- CloudCacheResult UpdatesResult = ComputeSession.GetComputeUpdates(m_ChannelId);
- Log().debug("Get compute updates Bytes={} Duration={}s Result={}",
- UpdatesResult.Bytes,
- UpdatesResult.ElapsedSeconds,
- UpdatesResult.Success);
- Bytes += UpdatesResult.Bytes;
- ElapsedSeconds += UpdatesResult.ElapsedSeconds;
- if (!UpdatesResult.Success)
- {
- return {.Error{.ErrorCode = UpdatesResult.ErrorCode, .Reason = std::move(UpdatesResult.Reason)},
- .Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds};
- }
-
- if (!UpdatesResult.Success)
- {
- return {.Error{.ErrorCode = -1, .Reason = "Failed get task updates"}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds};
- }
-
- CbObject TaskStatus = LoadCompactBinaryObject(std::move(UpdatesResult.Response));
-
- for (auto& It : TaskStatus["u"sv])
- {
- CbObjectView Status = It.AsObjectView();
- IoHash TaskId = Status["h"sv].AsHash();
- const ComputeTaskState State = (ComputeTaskState)Status["s"sv].AsInt32();
- const ComputeTaskOutcome Outcome = (ComputeTaskOutcome)Status["o"sv].AsInt32();
-
- Log().info("Task {} State={}", TaskId, ComputeTaskStateToString(State));
-
- // Only completed tasks need to be processed
- if (State != ComputeTaskState::Complete)
- {
- continue;
- }
-
- IoHash WorkerId{};
- IoHash ActionId{};
- UpstreamApplyType ApplyType{};
-
- {
- std::scoped_lock Lock(m_TaskMutex);
- auto TaskIt = m_PendingTasks.find(TaskId);
- if (TaskIt != m_PendingTasks.end())
- {
- WorkerId = TaskIt->second.WorkerDescriptor.GetHash();
- ActionId = TaskIt->second.Action.GetHash();
- ApplyType = TaskIt->second.Type;
- m_PendingTasks.erase(TaskIt);
- }
- }
-
- if (WorkerId == IoHash::Zero)
- {
- Log().warn("Task {} missing from pending tasks", TaskId);
- continue;
- }
-
- if (Outcome != ComputeTaskOutcome::Success)
- {
- const std::string_view Detail = Status["d"sv].AsString();
- {
- std::scoped_lock Lock(m_TaskMutex);
- m_CompletedTasks[WorkerId][ActionId] = {
- .Error{.ErrorCode = -1, .Reason = fmt::format("Task {} {}", ComputeTaskOutcomeToString(Outcome), Detail)}};
- }
- continue;
- }
-
- ThreadPool.ScheduleWork([this,
- ApplyType,
- ResultHash = Status["r"sv].AsHash(),
- TaskId = std::move(TaskId),
- WorkerId = std::move(WorkerId),
- ActionId = std::move(ActionId)]() {
- GetUpstreamApplyResult Result = ProcessTaskStatus(ApplyType, ResultHash);
- Log().debug("Task Processed {} Files={} Attachments={} ExitCode={}",
- TaskId,
- Result.OutputFiles.size(),
- Result.OutputPackage.GetAttachments().size(),
- Result.Error.ErrorCode);
- {
- std::scoped_lock Lock(m_TaskMutex);
- m_CompletedTasks[WorkerId][ActionId] = std::move(Result);
- }
- });
- }
-
- {
- std::scoped_lock Lock(m_TaskMutex);
- if (m_CompletedTasks.empty())
- {
- // Nothing to do.
- return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true};
- }
- UpstreamApplyCompleted CompletedTasks;
- std::swap(CompletedTasks, m_CompletedTasks);
- return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Completed = std::move(CompletedTasks), .Success = true};
- }
- }
- catch (std::exception& Err)
- {
- m_HealthOk = false;
- return {
- .Error{.ErrorCode = -1, .Reason = Err.what()},
- .Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds,
- };
- }
- }
-
- virtual UpstreamApplyEndpointStats& Stats() override { return m_Stats; }
-
- private:
- spdlog::logger& Log() { return m_Log; }
-
- spdlog::logger& m_Log;
- CasStore& m_CasStore;
- CidStore& m_CidStore;
- AuthMgr& m_AuthMgr;
- std::string m_DisplayName;
- RefPtr<CloudCacheClient> m_Client;
- RefPtr<CloudCacheClient> m_StorageClient;
- UpstreamApplyEndpointStats m_Stats;
- std::atomic_bool m_HealthOk{false};
- std::string m_ChannelId;
-
- std::mutex m_TaskMutex;
- std::unordered_map<IoHash, UpstreamApplyRecord> m_PendingTasks;
- UpstreamApplyCompleted m_CompletedTasks;
-
- struct UpstreamData
- {
- std::map<IoHash, IoBuffer> Blobs;
- std::map<IoHash, CbObject> Objects;
- IoHash TaskId;
- IoHash RequirementsId;
- };
-
- struct UpstreamDirectory
- {
- std::filesystem::path Path;
- std::map<std::string, UpstreamDirectory> Directories;
- std::set<std::string> Files;
- };
-
- [[nodiscard]] GetUpstreamApplyResult ProcessTaskStatus(const UpstreamApplyType ApplyType, const IoHash& ResultHash)
- {
- try
- {
- CloudCacheSession Session(m_StorageClient);
-
- int64_t Bytes{};
- double ElapsedSeconds{};
-
- // Get Result object and all Object Attachments + Binary Attachment IDs
- CloudCacheResult ObjectRefResult = Session.GetRef("responses"sv, ResultHash, ZenContentType::kCbObject);
- Log().debug("Get ref {} Bytes={} Duration={}s Result={}",
- ResultHash,
- ObjectRefResult.Bytes,
- ObjectRefResult.ElapsedSeconds,
- ObjectRefResult.Success);
- Bytes += ObjectRefResult.Bytes;
- ElapsedSeconds += ObjectRefResult.ElapsedSeconds;
-
- if (!ObjectRefResult.Success)
- {
- return {.Error{.ErrorCode = -1, .Reason = "Failed to get result object data"},
- .Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds};
- }
-
- std::vector<IoHash> ObjectsToIterate;
- std::map<IoHash, IoBuffer> ObjectData;
- std::map<IoHash, IoBuffer> BinaryData;
-
- ObjectData[ResultHash] = ObjectRefResult.Response;
- CbObject Object = LoadCompactBinaryObject(ObjectData[ResultHash]);
- Object.IterateAttachments([&](CbFieldView Field) {
- if (Field.IsObjectAttachment())
- {
- const IoHash AttachmentHash = Field.AsObjectAttachment();
- if (!ObjectData.contains(AttachmentHash))
- {
- ObjectsToIterate.push_back(AttachmentHash);
- }
- }
- else if (Field.IsBinaryAttachment())
- {
- const IoHash AttachmentHash = Field.AsBinaryAttachment();
- BinaryData[AttachmentHash] = {};
- }
- });
-
- while (!ObjectsToIterate.empty())
- {
- const IoHash Hash = ObjectsToIterate.back();
- ObjectsToIterate.pop_back();
-
- CloudCacheResult ObjectResult = Session.GetObject(Hash);
- Log().debug("Get object {} Bytes={} Duration={}s Result={}",
- Hash,
- ObjectResult.Bytes,
- ObjectResult.ElapsedSeconds,
- ObjectResult.Success);
- Bytes += ObjectRefResult.Bytes;
- ElapsedSeconds += ObjectRefResult.ElapsedSeconds;
- if (!ObjectResult.Success)
- {
- return {.Error{.ErrorCode = ObjectResult.ErrorCode, .Reason = std::move(ObjectResult.Reason)},
- .Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds};
- }
- ObjectData[Hash] = std::move(ObjectResult.Response);
-
- CbObject IterateObject = LoadCompactBinaryObject(ObjectData[Hash]);
- IterateObject.IterateAttachments([&](CbFieldView Field) {
- if (Field.IsObjectAttachment())
- {
- const IoHash AttachmentHash = Field.AsObjectAttachment();
- if (!ObjectData.contains(AttachmentHash))
- {
- ObjectsToIterate.push_back(AttachmentHash);
- }
- }
- else if (Field.IsBinaryAttachment())
- {
- const IoHash AttachmentHash = Field.AsBinaryAttachment();
- BinaryData[AttachmentHash] = {};
- }
- });
- }
-
- // Batch load all binary data
- for (auto& It : BinaryData)
- {
- CloudCacheResult BlobResult = Session.GetBlob(It.first);
- Log().debug("Get blob {} Bytes={} Duration={}s Result={}",
- It.first,
- BlobResult.Bytes,
- BlobResult.ElapsedSeconds,
- BlobResult.Success);
- Bytes += ObjectRefResult.Bytes;
- ElapsedSeconds += ObjectRefResult.ElapsedSeconds;
- if (!BlobResult.Success)
- {
- return {.Error{.ErrorCode = BlobResult.ErrorCode, .Reason = std::move(BlobResult.Reason)},
- .Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds};
- }
- It.second = std::move(BlobResult.Response);
- }
-
- CbObject ResultObject = LoadCompactBinaryObject(ObjectData[ResultHash]);
- int32_t ExitCode = ResultObject["e"sv].AsInt32();
- IoHash StdOutHash = ResultObject["so"sv].AsBinaryAttachment();
- IoHash StdErrHash = ResultObject["se"sv].AsBinaryAttachment();
- IoHash OutputHash = ResultObject["o"sv].AsObjectAttachment();
-
- std::string StdOut = std::string((const char*)BinaryData[StdOutHash].GetData(), BinaryData[StdOutHash].GetSize());
- std::string StdErr = std::string((const char*)BinaryData[StdErrHash].GetData(), BinaryData[StdErrHash].GetSize());
-
- if (OutputHash == IoHash::Zero)
- {
- return {.Error{.ErrorCode = ExitCode, .Reason = "Task completed with no output object"},
- .Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds,
- .StdOut = std::move(StdOut),
- .StdErr = std::move(StdErr)};
- }
-
- CbObject OutputObject = LoadCompactBinaryObject(ObjectData[OutputHash]);
-
- switch (ApplyType)
- {
- case UpstreamApplyType::Simple:
- {
- std::map<std::filesystem::path, IoHash> OutputFiles;
-
- ResolveMerkleTreeDirectory(""sv, OutputHash, ObjectData, OutputFiles);
-
- return {.OutputFiles = std::move(OutputFiles),
- .FileData = std::move(BinaryData),
- .Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds,
- .StdOut = std::move(StdOut),
- .StdErr = std::move(StdErr),
- .Success = true};
- }
- break;
- case UpstreamApplyType::Asset:
- {
- if (ExitCode != 0)
- {
- return {.Error{.ErrorCode = ExitCode, .Reason = "Task completed with errors"},
- .Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds,
- .StdOut = std::move(StdOut),
- .StdErr = std::move(StdErr)};
- }
-
- // Get build.output
- IoHash BuildOutputId;
- IoBuffer BuildOutput;
- for (auto& It : OutputObject["f"sv])
- {
- const CbObjectView FileObject = It.AsObjectView();
- if (FileObject["n"sv].AsString() == "Build.output"sv)
- {
- BuildOutputId = FileObject["h"sv].AsBinaryAttachment();
- BuildOutput = BinaryData[BuildOutputId];
- break;
- }
- }
-
- if (BuildOutput.GetSize() == 0)
- {
- return {.Error{.ErrorCode = ExitCode, .Reason = "Build.output file not found in task results"},
- .Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds,
- .StdOut = std::move(StdOut),
- .StdErr = std::move(StdErr)};
- }
-
- // Get Output directory node
- IoBuffer OutputDirectoryTree;
- for (auto& It : OutputObject["d"sv])
- {
- const CbObjectView DirectoryObject = It.AsObjectView();
- if (DirectoryObject["n"sv].AsString() == "Outputs"sv)
- {
- OutputDirectoryTree = ObjectData[DirectoryObject["h"sv].AsObjectAttachment()];
- break;
- }
- }
-
- if (OutputDirectoryTree.GetSize() == 0)
- {
- return {.Error{.ErrorCode = ExitCode, .Reason = "Outputs directory not found in task results"},
- .Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds,
- .StdOut = std::move(StdOut),
- .StdErr = std::move(StdErr)};
- }
-
- // load build.output as CbObject
-
- // Move Outputs from Horde to CbPackage
-
- std::unordered_map<IoHash, IoHash> CidToCompressedId;
- CbPackage OutputPackage;
- CbObject OutputDirectoryTreeObject = LoadCompactBinaryObject(OutputDirectoryTree);
- int64_t TotalAttachmentBytes = 0;
- int64_t TotalRawAttachmentBytes = 0;
-
- for (auto& It : OutputDirectoryTreeObject["f"sv])
- {
- CbObjectView FileObject = It.AsObjectView();
- // Name is the uncompressed hash
- IoHash DecompressedId = IoHash::FromHexString(FileObject["n"sv].AsString());
- // Hash is the compressed data hash, and how it is stored in Horde
- IoHash CompressedId = FileObject["h"sv].AsBinaryAttachment();
-
- if (!BinaryData.contains(CompressedId))
- {
- Log().warn("Object attachment chunk not retrieved from Horde {}", CompressedId);
- return {.Error{.ErrorCode = -1, .Reason = "Object attachment chunk not retrieved from Horde"},
- .Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds};
- }
- CidToCompressedId[DecompressedId] = CompressedId;
- }
-
- // Iterate attachments, verify all chunks exist, and add to CbPackage
- bool AnyErrors = false;
- CbObject BuildOutputObject = LoadCompactBinaryObject(BuildOutput);
- BuildOutputObject.IterateAttachments([&](CbFieldView Field) {
- const IoHash DecompressedId = Field.AsHash();
- if (!CidToCompressedId.contains(DecompressedId))
- {
- Log().warn("Attachment not found {}", DecompressedId);
- AnyErrors = true;
- return;
- }
- const IoHash& CompressedId = CidToCompressedId.at(DecompressedId);
-
- if (!BinaryData.contains(CompressedId))
- {
- Log().warn("Missing output {} compressed {} uncompressed", CompressedId, DecompressedId);
- AnyErrors = true;
- return;
- }
-
- CompressedBuffer AttachmentBuffer =
- CompressedBuffer::FromCompressed(SharedBuffer(BinaryData[CompressedId]));
-
- if (!AttachmentBuffer)
- {
- Log().warn(
- "Invalid output encountered (not valid CompressedBuffer format) {} compressed {} uncompressed",
- CompressedId,
- DecompressedId);
- AnyErrors = true;
- return;
- }
-
- TotalAttachmentBytes += AttachmentBuffer.GetCompressedSize();
- TotalRawAttachmentBytes += AttachmentBuffer.GetRawSize();
-
- CbAttachment Attachment(AttachmentBuffer);
- OutputPackage.AddAttachment(Attachment);
- });
-
- if (AnyErrors)
- {
- return {.Error{.ErrorCode = -1, .Reason = "Failed to get result object attachment data"},
- .Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds,
- .StdOut = std::move(StdOut),
- .StdErr = std::move(StdErr)};
- }
-
- OutputPackage.SetObject(BuildOutputObject);
-
- return {.OutputPackage = std::move(OutputPackage),
- .TotalAttachmentBytes = TotalAttachmentBytes,
- .TotalRawAttachmentBytes = TotalRawAttachmentBytes,
- .Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds,
- .StdOut = std::move(StdOut),
- .StdErr = std::move(StdErr),
- .Success = true};
- }
- break;
- }
-
- return {.Error{.ErrorCode = ExitCode, .Reason = "Unknown apply type"},
- .Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds,
- .StdOut = std::move(StdOut),
- .StdErr = std::move(StdErr)};
- }
- catch (std::exception& Err)
- {
- return {.Error{.ErrorCode = -1, .Reason = Err.what()}};
- }
- }
-
- [[nodiscard]] bool ProcessApplyKey(const UpstreamApplyRecord& ApplyRecord, UpstreamData& Data)
- {
- std::string ExecutablePath;
- std::string WorkingDirectory;
- std::vector<std::string> Arguments;
- std::map<std::string, std::string> Environment;
- std::set<std::filesystem::path> InputFiles;
- std::set<std::string> Outputs;
- std::map<std::filesystem::path, IoHash> InputFileHashes;
-
- ExecutablePath = ApplyRecord.WorkerDescriptor["path"sv].AsString();
- if (ExecutablePath.empty())
- {
- Log().warn("process apply upstream FAILED, '{}', path missing from worker descriptor",
- ApplyRecord.WorkerDescriptor.GetHash());
- return false;
- }
-
- WorkingDirectory = ApplyRecord.WorkerDescriptor["workdir"sv].AsString();
-
- for (auto& It : ApplyRecord.WorkerDescriptor["executables"sv])
- {
- CbObjectView FileEntry = It.AsObjectView();
- if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.Blobs))
- {
- return false;
- }
- }
-
- for (auto& It : ApplyRecord.WorkerDescriptor["files"sv])
- {
- CbObjectView FileEntry = It.AsObjectView();
- if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.Blobs))
- {
- return false;
- }
- }
-
- for (auto& It : ApplyRecord.WorkerDescriptor["dirs"sv])
- {
- std::string_view Directory = It.AsString();
- std::string DummyFile = fmt::format("{}/.zen_empty_file", Directory);
- InputFiles.insert(DummyFile);
- Data.Blobs[EmptyBufferId] = EmptyBuffer;
- InputFileHashes[DummyFile] = EmptyBufferId;
- }
-
- for (auto& It : ApplyRecord.WorkerDescriptor["environment"sv])
- {
- std::string_view Env = It.AsString();
- auto Index = Env.find('=');
- if (Index == std::string_view::npos)
- {
- Log().warn("process apply upstream FAILED, environment '{}' malformed", Env);
- return false;
- }
-
- Environment[std::string(Env.substr(0, Index))] = Env.substr(Index + 1);
- }
-
- switch (ApplyRecord.Type)
- {
- case UpstreamApplyType::Simple:
- {
- for (auto& It : ApplyRecord.WorkerDescriptor["arguments"sv])
- {
- Arguments.push_back(std::string(It.AsString()));
- }
-
- for (auto& It : ApplyRecord.WorkerDescriptor["outputs"sv])
- {
- Outputs.insert(std::string(It.AsString()));
- }
- }
- break;
- case UpstreamApplyType::Asset:
- {
- static const std::filesystem::path BuildActionPath = "Build.action"sv;
- static const std::filesystem::path InputPath = "Inputs"sv;
- const IoHash ActionId = ApplyRecord.Action.GetHash();
-
- Arguments.push_back("-Build=build.action");
- Outputs.insert("Build.output");
- Outputs.insert("Outputs");
-
- InputFiles.insert(BuildActionPath);
- InputFileHashes[BuildActionPath] = ActionId;
- Data.Blobs[ActionId] = IoBufferBuilder::MakeCloneFromMemory(ApplyRecord.Action.GetBuffer().GetData(),
- ApplyRecord.Action.GetBuffer().GetSize());
-
- bool AnyErrors = false;
- ApplyRecord.Action.IterateAttachments([&](CbFieldView Field) {
- const IoHash Cid = Field.AsHash();
- const std::filesystem::path FilePath = {InputPath / Cid.ToHexString()};
- IoBuffer DataBuffer = m_CidStore.FindChunkByCid(Cid);
-
- if (!DataBuffer)
- {
- Log().warn("process apply upstream FAILED, input CID chunk '{}' missing", Cid);
- AnyErrors = true;
- return;
- }
-
- if (InputFiles.contains(FilePath))
- {
- return;
- }
-
- const IoHash CompressedId = IoHash::HashBuffer(DataBuffer.GetData(), DataBuffer.GetSize());
-
- InputFiles.insert(FilePath);
- InputFileHashes[FilePath] = CompressedId;
- Data.Blobs[CompressedId] = std::move(DataBuffer);
- });
-
- if (AnyErrors)
- {
- return false;
- }
- }
- break;
- }
-
- const UpstreamDirectory RootDirectory = BuildDirectoryTree(InputFiles);
-
- CbObject Sandbox = BuildMerkleTreeDirectory(RootDirectory, InputFileHashes, Data.Blobs, Data.Objects);
- const IoHash SandboxHash = Sandbox.GetHash();
- Data.Objects[SandboxHash] = std::move(Sandbox);
-
- {
- std::string_view HostPlatform = ApplyRecord.WorkerDescriptor["host"sv].AsString();
- if (HostPlatform.empty())
- {
- Log().warn("process apply upstream FAILED, 'host' platform not provided");
- return false;
- }
-
- int32_t LogicalCores = ApplyRecord.WorkerDescriptor["cores"sv].AsInt32();
- int64_t Memory = ApplyRecord.WorkerDescriptor["memory"sv].AsInt64();
- bool Exclusive = ApplyRecord.WorkerDescriptor["exclusive"sv].AsBool();
-
- std::string Condition = fmt::format("Platform == '{}'", HostPlatform);
- if (HostPlatform == "Win64")
- {
- // TODO
- // Condition += " && Pool == 'Win-RemoteExec'";
- }
-
- std::map<std::string_view, int64_t> Resources;
- if (LogicalCores > 0)
- {
- Resources["LogicalCores"sv] = LogicalCores;
- }
- if (Memory > 0)
- {
- Resources["RAM"sv] = std::max(Memory / 1024LL / 1024LL / 1024LL, 1LL);
- }
-
- CbObject Requirements = BuildRequirements(Condition, Resources, Exclusive);
- const IoHash RequirementsId = Requirements.GetHash();
- Data.Objects[RequirementsId] = std::move(Requirements);
- Data.RequirementsId = RequirementsId;
- }
-
- CbObject Task = BuildTask(ExecutablePath, Arguments, Environment, WorkingDirectory, SandboxHash, Data.RequirementsId, Outputs);
-
- const IoHash TaskId = Task.GetHash();
- Data.Objects[TaskId] = std::move(Task);
- Data.TaskId = TaskId;
-
- return true;
- }
-
- [[nodiscard]] bool ProcessFileEntry(const CbObjectView& FileEntry,
- std::set<std::filesystem::path>& InputFiles,
- std::map<std::filesystem::path, IoHash>& InputFileHashes,
- std::map<IoHash, IoBuffer>& Blobs)
- {
- const std::filesystem::path FilePath = FileEntry["name"sv].AsString();
- const IoHash ChunkId = FileEntry["hash"sv].AsHash();
- const uint64_t Size = FileEntry["size"sv].AsUInt64();
- IoBuffer DataBuffer = m_CasStore.FindChunk(ChunkId);
-
- if (!DataBuffer)
- {
- Log().warn("process apply upstream FAILED, worker CAS chunk '{}' missing", ChunkId);
- return false;
- }
-
- if (DataBuffer.Size() != Size)
- {
- Log().warn("process apply upstream FAILED, worker CAS chunk '{}' size: {}, action spec expected {}",
- ChunkId,
- DataBuffer.Size(),
- Size);
- return false;
- }
-
- if (InputFiles.contains(FilePath))
- {
- Log().warn("process apply upstream FAILED, worker CAS chunk '{}' size: {} duplicate filename {}", ChunkId, Size, FilePath);
- return false;
- }
-
- InputFiles.insert(FilePath);
- InputFileHashes[FilePath] = ChunkId;
- Blobs[ChunkId] = std::move(DataBuffer);
- return true;
- }
-
- [[nodiscard]] UpstreamDirectory BuildDirectoryTree(const std::set<std::filesystem::path>& InputFiles)
- {
- static const std::filesystem::path RootPath;
- std::map<std::filesystem::path, UpstreamDirectory*> AllDirectories;
- UpstreamDirectory RootDirectory = {.Path = RootPath};
-
- AllDirectories[RootPath] = &RootDirectory;
-
- // Build tree from flat list
- for (const auto& Path : InputFiles)
- {
- if (Path.has_parent_path())
- {
- if (!AllDirectories.contains(Path.parent_path()))
- {
- std::stack<std::string> PathSplit;
- {
- std::filesystem::path ParentPath = Path.parent_path();
- PathSplit.push(ParentPath.filename().string());
- while (ParentPath.has_parent_path())
- {
- ParentPath = ParentPath.parent_path();
- PathSplit.push(ParentPath.filename().string());
- }
- }
- UpstreamDirectory* ParentPtr = &RootDirectory;
- while (!PathSplit.empty())
- {
- if (!ParentPtr->Directories.contains(PathSplit.top()))
- {
- std::filesystem::path NewParentPath = {ParentPtr->Path / PathSplit.top()};
- ParentPtr->Directories[PathSplit.top()] = {.Path = NewParentPath};
- AllDirectories[NewParentPath] = &ParentPtr->Directories[PathSplit.top()];
- }
- ParentPtr = &ParentPtr->Directories[PathSplit.top()];
- PathSplit.pop();
- }
- }
-
- AllDirectories[Path.parent_path()]->Files.insert(Path.filename().string());
- }
- else
- {
- RootDirectory.Files.insert(Path.filename().string());
- }
- }
-
- return RootDirectory;
- }
-
- [[nodiscard]] CbObject BuildMerkleTreeDirectory(const UpstreamDirectory& RootDirectory,
- const std::map<std::filesystem::path, IoHash>& InputFileHashes,
- const std::map<IoHash, IoBuffer>& Blobs,
- std::map<IoHash, CbObject>& Objects)
- {
- CbObjectWriter DirectoryTreeWriter;
-
- if (!RootDirectory.Files.empty())
- {
- DirectoryTreeWriter.BeginArray("f"sv);
- for (const auto& File : RootDirectory.Files)
- {
- const std::filesystem::path FilePath = {RootDirectory.Path / File};
- const IoHash& FileHash = InputFileHashes.at(FilePath);
- const uint64_t FileSize = Blobs.at(FileHash).Size();
- DirectoryTreeWriter.BeginObject();
- DirectoryTreeWriter.AddString("n"sv, File);
- DirectoryTreeWriter.AddBinaryAttachment("h"sv, FileHash);
- DirectoryTreeWriter.AddInteger("s"sv, FileSize); // Size
- // DirectoryTreeWriter.AddInteger("a"sv, 0); // Attributes Currently unneeded
- DirectoryTreeWriter.EndObject();
- }
- DirectoryTreeWriter.EndArray();
- }
-
- if (!RootDirectory.Directories.empty())
- {
- DirectoryTreeWriter.BeginArray("d"sv);
- for (const auto& Item : RootDirectory.Directories)
- {
- CbObject Directory = BuildMerkleTreeDirectory(Item.second, InputFileHashes, Blobs, Objects);
- const IoHash DirectoryHash = Directory.GetHash();
- Objects[DirectoryHash] = std::move(Directory);
-
- DirectoryTreeWriter.BeginObject();
- DirectoryTreeWriter.AddString("n"sv, Item.first);
- DirectoryTreeWriter.AddObjectAttachment("h"sv, DirectoryHash);
- DirectoryTreeWriter.EndObject();
- }
- DirectoryTreeWriter.EndArray();
- }
-
- return DirectoryTreeWriter.Save();
- }
-
- void ResolveMerkleTreeDirectory(const std::filesystem::path& ParentDirectory,
- const IoHash& DirectoryHash,
- const std::map<IoHash, IoBuffer>& Objects,
- std::map<std::filesystem::path, IoHash>& OutputFiles)
- {
- CbObject Directory = LoadCompactBinaryObject(Objects.at(DirectoryHash));
-
- for (auto& It : Directory["f"sv])
- {
- const CbObjectView FileObject = It.AsObjectView();
- const std::filesystem::path Path = ParentDirectory / FileObject["n"sv].AsString();
-
- OutputFiles[Path] = FileObject["h"sv].AsBinaryAttachment();
- }
-
- for (auto& It : Directory["d"sv])
- {
- const CbObjectView DirectoryObject = It.AsObjectView();
-
- ResolveMerkleTreeDirectory(ParentDirectory / DirectoryObject["n"sv].AsString(),
- DirectoryObject["h"sv].AsObjectAttachment(),
- Objects,
- OutputFiles);
- }
- }
-
- [[nodiscard]] CbObject BuildRequirements(const std::string_view Condition,
- const std::map<std::string_view, int64_t>& Resources,
- const bool Exclusive)
- {
- CbObjectWriter Writer;
- Writer.AddString("c", Condition);
- if (!Resources.empty())
- {
- Writer.BeginArray("r");
- for (const auto& Resource : Resources)
- {
- Writer.BeginArray();
- Writer.AddString(Resource.first);
- Writer.AddInteger(Resource.second);
- Writer.EndArray();
- }
- Writer.EndArray();
- }
- Writer.AddBool("e", Exclusive);
- return Writer.Save();
- }
-
- [[nodiscard]] CbObject BuildTask(const std::string_view Executable,
- const std::vector<std::string>& Arguments,
- const std::map<std::string, std::string>& Environment,
- const std::string_view WorkingDirectory,
- const IoHash& SandboxHash,
- const IoHash& RequirementsId,
- const std::set<std::string>& Outputs)
- {
- CbObjectWriter TaskWriter;
- TaskWriter.AddString("e"sv, Executable);
-
- if (!Arguments.empty())
- {
- TaskWriter.BeginArray("a"sv);
- for (const auto& Argument : Arguments)
- {
- TaskWriter.AddString(Argument);
- }
- TaskWriter.EndArray();
- }
-
- if (!Environment.empty())
- {
- TaskWriter.BeginArray("v"sv);
- for (const auto& Env : Environment)
- {
- TaskWriter.BeginArray();
- TaskWriter.AddString(Env.first);
- TaskWriter.AddString(Env.second);
- TaskWriter.EndArray();
- }
- TaskWriter.EndArray();
- }
-
- if (!WorkingDirectory.empty())
- {
- TaskWriter.AddString("w"sv, WorkingDirectory);
- }
-
- TaskWriter.AddObjectAttachment("s"sv, SandboxHash);
- TaskWriter.AddObjectAttachment("r"sv, RequirementsId);
-
- // Outputs
- if (!Outputs.empty())
- {
- TaskWriter.BeginArray("o"sv);
- for (const auto& Output : Outputs)
- {
- TaskWriter.AddString(Output);
- }
- TaskWriter.EndArray();
- }
-
- return TaskWriter.Save();
- }
- };
-} // namespace detail
-
-//////////////////////////////////////////////////////////////////////////
-
struct UpstreamApplyStats
{
static constexpr uint64_t MaxSampleCount = 1000ull;
@@ -1360,7 +83,8 @@ public:
, m_CasStore(CasStore)
, m_CidStore(CidStore)
, m_Stats(Options.StatsEnabled)
- , m_AsyncWorkPool(Options.ThreadCount)
+ , m_UpstreamAsyncWorkPool(Options.UpstreamThreadCount)
+ , m_DownstreamAsyncWorkPool(Options.DownstreamThreadCount)
{
}
@@ -1423,7 +147,9 @@ public:
m_ApplyTasks[WorkerId][ActionId] = {.State = UpstreamApplyState::Queued, .Result{}, .ExpireTime = std::move(ExpireTime)};
}
- m_AsyncWorkPool.ScheduleWork([this, ApplyRecord = std::move(ApplyRecord)]() { ProcessApplyRecord(std::move(ApplyRecord)); });
+ ApplyRecord.Timepoints["zen-queue-added"] = DateTime::NowTicks();
+ m_UpstreamAsyncWorkPool.ScheduleWork(
+ [this, ApplyRecord = std::move(ApplyRecord)]() { ProcessApplyRecord(std::move(ApplyRecord)); });
return {.ApplyId = ActionId, .Success = true};
}
@@ -1447,8 +173,10 @@ public:
virtual void GetStatus(CbObjectWriter& Status) override
{
- Status << "worker_threads" << m_Options.ThreadCount;
- Status << "queue_count" << m_AsyncWorkPool.PendingWork();
+ Status << "upstream_worker_threads" << m_Options.UpstreamThreadCount;
+ Status << "upstream_queue_count" << m_UpstreamAsyncWorkPool.PendingWork();
+ Status << "downstream_worker_threads" << m_Options.DownstreamThreadCount;
+ Status << "downstream_queue_count" << m_DownstreamAsyncWorkPool.PendingWork();
Status.BeginArray("endpoints");
for (const auto& Ep : m_Endpoints)
@@ -1501,11 +229,14 @@ private:
{
if (Endpoint->IsHealthy())
{
- PostUpstreamApplyResult Result = Endpoint->PostApply(std::move(ApplyRecord));
+ ApplyRecord.Timepoints["zen-queue-dispatched"] = DateTime::NowTicks();
+ PostUpstreamApplyResult Result = Endpoint->PostApply(std::move(ApplyRecord));
{
std::scoped_lock Lock(m_ApplyTasksMutex);
if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr)
{
+ Status->Timepoints.merge(Result.Timepoints);
+
if (Result.Success)
{
Status->State = UpstreamApplyState::Executing;
@@ -1553,7 +284,7 @@ private:
{
if (Endpoint->IsHealthy())
{
- GetUpstreamApplyUpdatesResult Result = Endpoint->GetUpdates(m_AsyncWorkPool);
+ GetUpstreamApplyUpdatesResult Result = Endpoint->GetUpdates(m_DownstreamAsyncWorkPool);
m_Stats.Add(*Endpoint, Result);
if (!Result.Success)
@@ -1572,6 +303,9 @@ private:
{
Status->State = UpstreamApplyState::Complete;
Status->Result = std::move(It2.second);
+ Status->Result.Timepoints.merge(Status->Timepoints);
+ Status->Result.Timepoints["zen-queue-complete"] = DateTime::NowTicks();
+ Status->Timepoints.clear();
}
}
}
@@ -1686,7 +420,8 @@ private:
std::mutex m_ApplyTasksMutex;
std::vector<std::unique_ptr<UpstreamApplyEndpoint>> m_Endpoints;
Event m_ShutdownEvent;
- WorkerThreadPool m_AsyncWorkPool;
+ WorkerThreadPool m_UpstreamAsyncWorkPool;
+ WorkerThreadPool m_DownstreamAsyncWorkPool;
std::thread m_UpstreamUpdatesThread;
std::thread m_EndpointMonitorThread;
RunState m_RunState;
@@ -1700,24 +435,6 @@ UpstreamApply::Create(const UpstreamApplyOptions& Options, CasStore& CasStore, C
return std::make_unique<UpstreamApplyImpl>(Options, CasStore, CidStore);
}
-std::unique_ptr<UpstreamApplyEndpoint>
-UpstreamApplyEndpoint::CreateHordeEndpoint(const CloudCacheClientOptions& ComputeOptions,
- const UpstreamAuthConfig& ComputeAuthConfig,
- const CloudCacheClientOptions& StorageOptions,
- const UpstreamAuthConfig& StorageAuthConfig,
- CasStore& CasStore,
- CidStore& CidStore,
- AuthMgr& Mgr)
-{
- return std::make_unique<detail::HordeUpstreamApplyEndpoint>(ComputeOptions,
- ComputeAuthConfig,
- StorageOptions,
- StorageAuthConfig,
- CasStore,
- CidStore,
- Mgr);
-}
-
} // namespace zen
#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/zenserver/upstream/upstreamapply.h b/zenserver/upstream/upstreamapply.h
index 9cea88c56..c6e38142c 100644
--- a/zenserver/upstream/upstreamapply.h
+++ b/zenserver/upstream/upstreamapply.h
@@ -10,10 +10,8 @@
# include <zencore/stats.h>
# include <zencore/zencore.h>
-# include <atomic>
# include <chrono>
# include <map>
-# include <memory>
# include <unordered_map>
# include <unordered_set>
@@ -44,17 +42,19 @@ enum class UpstreamApplyType
struct UpstreamApplyRecord
{
- CbObject WorkerDescriptor;
- CbObject Action;
- UpstreamApplyType Type;
+ CbObject WorkerDescriptor;
+ CbObject Action;
+ UpstreamApplyType Type;
+ std::map<std::string, uint64_t> Timepoints{};
};
struct UpstreamApplyOptions
{
std::chrono::seconds HealthCheckInterval{5};
std::chrono::seconds UpdatesInterval{5};
- uint32_t ThreadCount = 4;
- bool StatsEnabled = false;
+ uint32_t UpstreamThreadCount = 4;
+ uint32_t DownstreamThreadCount = 4;
+ bool StatsEnabled = false;
};
struct UpstreamApplyError
@@ -67,31 +67,33 @@ struct UpstreamApplyError
struct PostUpstreamApplyResult
{
- UpstreamApplyError Error{};
- int64_t Bytes{};
- double ElapsedSeconds{};
- bool Success = false;
+ UpstreamApplyError Error{};
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+ std::map<std::string, uint64_t> Timepoints{};
+ bool Success = false;
};
struct GetUpstreamApplyResult
{
// UpstreamApplyType::Simple
- std::map<std::filesystem::path, IoHash> OutputFiles;
- std::map<IoHash, IoBuffer> FileData;
+ std::map<std::filesystem::path, IoHash> OutputFiles{};
+ std::map<IoHash, IoBuffer> FileData{};
// UpstreamApplyType::Asset
CbPackage OutputPackage{};
int64_t TotalAttachmentBytes{};
int64_t TotalRawAttachmentBytes{};
- UpstreamApplyError Error{};
- int64_t Bytes{};
- double ElapsedSeconds{};
- std::string StdOut{};
- std::string StdErr{};
- std::string Agent{};
- std::string Detail{};
- bool Success = false;
+ UpstreamApplyError Error{};
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+ std::string StdOut{};
+ std::string StdErr{};
+ std::string Agent{};
+ std::string Detail{};
+ std::map<std::string, uint64_t> Timepoints{};
+ bool Success = false;
};
using UpstreamApplyCompleted = std::unordered_map<IoHash, std::unordered_map<IoHash, GetUpstreamApplyResult>>;
@@ -110,6 +112,7 @@ struct UpstreamApplyStatus
UpstreamApplyState State{};
GetUpstreamApplyResult Result{};
std::chrono::steady_clock::time_point ExpireTime{};
+ std::map<std::string, uint64_t> Timepoints{};
};
using UpstreamApplyTasks = std::unordered_map<IoHash, std::unordered_map<IoHash, UpstreamApplyStatus>>;
diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp
index 3ac5b9992..abaec888a 100644
--- a/zenserver/zenserver.cpp
+++ b/zenserver/zenserver.cpp
@@ -850,7 +850,7 @@ ZenServer::InitializeCompute(const ZenServerOptions& ServerOptions)
const ZenUpstreamCacheConfig& UpstreamConfig = ServerOptions.UpstreamCacheConfig;
// Horde compute upstream
- if (UpstreamConfig.HordeConfig.Url.empty() == false && UpstreamConfig.JupiterConfig.Url.empty() == false)
+ if (UpstreamConfig.HordeConfig.Url.empty() == false && UpstreamConfig.HordeConfig.StorageUrl.empty() == false)
{
ZEN_INFO("instantiating compute service");
@@ -861,8 +861,7 @@ ZenServer::InitializeCompute(const ZenServerOptions& ServerOptions)
.ServiceUrl = UpstreamConfig.HordeConfig.Url,
.ComputeCluster = UpstreamConfig.HordeConfig.Cluster,
.ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds),
- .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds),
- .UseLegacyDdc = false};
+ .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)};
auto ComputeAuthConfig = zen::UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.HordeConfig.OAuthUrl,
.OAuthClientId = UpstreamConfig.HordeConfig.OAuthClientId,
@@ -872,16 +871,16 @@ ZenServer::InitializeCompute(const ZenServerOptions& ServerOptions)
auto StorageOptions =
zen::CloudCacheClientOptions{.Name = EndpointName,
- .ServiceUrl = UpstreamConfig.JupiterConfig.Url,
+ .ServiceUrl = UpstreamConfig.HordeConfig.StorageUrl,
.BlobStoreNamespace = UpstreamConfig.HordeConfig.Namespace,
.ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds),
.Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)};
- auto StorageAuthConfig = zen::UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.JupiterConfig.OAuthUrl,
- .OAuthClientId = UpstreamConfig.JupiterConfig.OAuthClientId,
- .OAuthClientSecret = UpstreamConfig.JupiterConfig.OAuthClientSecret,
- .OpenIdProvider = UpstreamConfig.JupiterConfig.OpenIdProvider,
- .AccessToken = UpstreamConfig.JupiterConfig.AccessToken};
+ auto StorageAuthConfig = zen::UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.HordeConfig.StorageOAuthUrl,
+ .OAuthClientId = UpstreamConfig.HordeConfig.StorageOAuthClientId,
+ .OAuthClientSecret = UpstreamConfig.HordeConfig.StorageOAuthClientSecret,
+ .OpenIdProvider = UpstreamConfig.HordeConfig.StorageOpenIdProvider,
+ .AccessToken = UpstreamConfig.HordeConfig.StorageAccessToken};
m_HttpFunctionService = std::make_unique<zen::HttpFunctionService>(*m_CasStore,
*m_CidStore,
@@ -893,7 +892,7 @@ ZenServer::InitializeCompute(const ZenServerOptions& ServerOptions)
}
else
{
- ZEN_INFO("NOT instantiating compute service (missing Horde or Jupiter config)");
+ ZEN_INFO("NOT instantiating compute service (missing Horde or Storage config)");
}
}
#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/zenstore-test/zenstore-test.cpp b/zenstore-test/zenstore-test.cpp
index 030b1159d..ebc0806d5 100644
--- a/zenstore-test/zenstore-test.cpp
+++ b/zenstore-test/zenstore-test.cpp
@@ -1,7 +1,7 @@
// Copyright Epic Games, Inc. All Rights Reserved.
-#include <zencore/logging.h>
#include <zencore/filesystem.h>
+#include <zencore/logging.h>
#include <zencore/zencore.h>
#include <zenstore/zenstore.h>